Skip to content

Commit

Permalink
Merge pull request #1406 from hyperledger/op-phase
Browse files Browse the repository at this point in the history
Handle idempotent retry after error during initializing phase
  • Loading branch information
peterbroadhurst authored Sep 28, 2023
2 parents 7067eb5 + 9e32841 commit b3443ec
Show file tree
Hide file tree
Showing 51 changed files with 933 additions and 526 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ $(eval $(call makemock, internal/assets, Manager, assetm
$(eval $(call makemock, internal/contracts, Manager, contractmocks))
$(eval $(call makemock, internal/spievents, Manager, spieventsmocks))
$(eval $(call makemock, internal/orchestrator, Orchestrator, orchestratormocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))
$(eval $(call makemock, internal/cache, Manager, cachemocks))
$(eval $(call makemock, internal/metrics, Manager, metricsmocks))
$(eval $(call makemock, internal/operations, Manager, operationmocks))
$(eval $(call makemock, internal/multiparty, Manager, multipartymocks))
$(eval $(call makemock, internal/apiserver, FFISwaggerGen, apiservermocks))
$(eval $(call makemock, internal/apiserver, Server, apiservermocks))

firefly-nocgo: ${GOFILES}
CGO_ENABLED=0 $(VGO) build -o ${BINARY_NAME}-nocgo -ldflags "-X main.buildDate=$(DATE) -X main.buildVersion=$(BUILD_VERSION) -X 'github.com/hyperledger/firefly/cmd.BuildVersionOverride=$(BUILD_VERSION)' -X 'github.com/hyperledger/firefly/cmd.BuildDate=$(DATE)' -X 'github.com/hyperledger/firefly/cmd.BuildCommit=$(GIT_REF)'" -tags=prod -tags=prod -v
Expand Down
2 changes: 1 addition & 1 deletion internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Manager interface {

// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)
}

type assetManager struct {
Expand Down
30 changes: 16 additions & 14 deletions internal/assets/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly/internal/coremsgs"
"github.com/hyperledger/firefly/internal/operations"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/pkg/core"
)
Expand Down Expand Up @@ -99,49 +100,50 @@ func (am *assetManager) PrepareOperation(ctx context.Context, op *core.Operation
}
}

func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, complete bool, err error) {
func (am *assetManager) RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error) {
switch data := op.Data.(type) {
case createPoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, complete, err
phase, err = plugin.CreateTokenPool(ctx, op.NamespacedIDString(), data.Pool)
return nil, phase, err

case activatePoolData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
complete, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, complete, err
phase, err = plugin.ActivateTokenPool(ctx, data.Pool)
return nil, phase, err

case transferData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
switch data.Transfer.Type {
case core.TokenTransferTypeMint:
return nil, false, plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.MintTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
case core.TokenTransferTypeTransfer:
return nil, false, plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.TransferTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
case core.TokenTransferTypeBurn:
return nil, false, plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
err = plugin.BurnTokens(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Transfer, data.Pool.Methods)
default:
panic(fmt.Sprintf("unknown transfer type: %v", data.Transfer.Type))
}
return nil, operations.ErrTernary(err, core.OpPhaseInitializing, core.OpPhasePending), err

case approvalData:
plugin, err := am.selectTokenPlugin(ctx, data.Pool.Connector)
if err != nil {
return nil, false, err
return nil, core.OpPhaseInitializing, err
}
return nil, false, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)
return nil, core.OpPhaseInitializing, plugin.TokensApproval(ctx, op.NamespacedIDString(), data.Pool.Locator, data.Approval, data.Pool.Methods)

default:
return nil, false, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
return nil, core.OpPhaseInitializing, i18n.NewError(ctx, coremsgs.MsgOperationDataIncorrect, op.Data)
}
}

Expand Down
62 changes: 31 additions & 31 deletions internal/assets/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ func TestPrepareAndRunCreatePool(t *testing.T) {
assert.NoError(t, err)

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseComplete, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(createPoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -78,16 +78,16 @@ func TestPrepareAndRunActivatePool(t *testing.T) {

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mdi := am.database.(*databasemocks.Plugin)
mti.On("ActivateTokenPool", context.Background(), pool).Return(true, nil)
mti.On("ActivateTokenPool", context.Background(), pool).Return(core.OpPhaseComplete, nil)
mdi.On("GetTokenPoolByID", context.Background(), "ns1", pool.ID).Return(pool, nil)

po, err := am.PrepareOperation(context.Background(), op)
assert.NoError(t, err)
assert.Equal(t, pool, po.Data.(activatePoolData).Pool)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.True(t, complete)
assert.Equal(t, core.OpPhaseComplete, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -124,9 +124,9 @@ func TestPrepareAndRunTransfer(t *testing.T) {
assert.Equal(t, pool, po.Data.(transferData).Pool)
assert.Equal(t, transfer, po.Data.(transferData).Transfer)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -163,9 +163,9 @@ func TestPrepareAndRunApproval(t *testing.T) {
assert.Equal(t, pool, po.Data.(approvalData).Pool)
assert.Equal(t, approval, po.Data.(approvalData).Approval)

_, complete, err := am.RunOperation(context.Background(), po)
_, phase, err := am.RunOperation(context.Background(), po)

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down Expand Up @@ -352,9 +352,9 @@ func TestRunOperationNotSupported(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()

_, complete, err := am.RunOperation(context.Background(), &core.PreparedOperation{})
_, phase, err := am.RunOperation(context.Background(), &core.PreparedOperation{})

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10378", err)
}

Expand All @@ -365,9 +365,9 @@ func TestRunOperationCreatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -384,11 +384,11 @@ func TestRunOperationCreatePool(t *testing.T) {
}

mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(false, nil)
mti.On("CreateTokenPool", context.Background(), "ns1:"+op.ID.String(), pool).Return(core.OpPhaseInitializing, nil)

_, complete, err := am.RunOperation(context.Background(), opCreatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opCreatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -401,9 +401,9 @@ func TestRunOperationActivatePoolBadPlugin(t *testing.T) {
op := &core.Operation{}
pool := &core.TokenPool{}

_, complete, err := am.RunOperation(context.Background(), opActivatePool(op, pool))
_, phase, err := am.RunOperation(context.Background(), opActivatePool(op, pool))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -415,9 +415,9 @@ func TestRunOperationTransferBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
transfer := &core.TokenTransfer{}

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand All @@ -429,9 +429,9 @@ func TestRunOperationApprovalBadPlugin(t *testing.T) {
pool := &core.TokenPool{}
approval := &core.TokenApproval{}

_, complete, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))
_, phase, err := am.RunOperation(context.Background(), opApproval(op, pool, approval))

assert.False(t, complete)
assert.Equal(t, core.OpPhaseInitializing, phase)
assert.Regexp(t, "FF10272", err)
}

Expand Down Expand Up @@ -473,9 +473,9 @@ func TestRunOperationTransferMint(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -501,9 +501,9 @@ func TestRunOperationTransferMintWithInterface(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("MintTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, pool.Methods).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -528,9 +528,9 @@ func TestRunOperationTransferBurn(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("BurnTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand All @@ -555,9 +555,9 @@ func TestRunOperationTransfer(t *testing.T) {
mti := am.tokens["magic-tokens"].(*tokenmocks.Plugin)
mti.On("TransferTokens", context.Background(), "ns1:"+op.ID.String(), "F1", transfer, (*fftypes.JSONAny)(nil)).Return(nil)

_, complete, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))
_, phase, err := am.RunOperation(context.Background(), opTransfer(op, pool, transfer))

assert.False(t, complete)
assert.Equal(t, core.OpPhasePending, phase)
assert.NoError(t, err)

mti.AssertExpectations(t)
Expand Down
30 changes: 20 additions & 10 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ func (am *assetManager) GetTokenApprovals(ctx context.Context, filter ffapi.AndF
}

type approveSender struct {
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
mgr *assetManager
approval *core.TokenApprovalInput
resolved bool
msgSender syncasync.Sender
idempotentSubmit bool
}

func (s *approveSender) Prepare(ctx context.Context) error {
Expand All @@ -58,8 +59,9 @@ func (s *approveSender) setDefaults() {

func (am *assetManager) NewApproval(approval *core.TokenApprovalInput) syncasync.Sender {
sender := &approveSender{
mgr: am,
approval: approval,
mgr: am,
approval: approval,
idempotentSubmit: approval.IdempotencyKey != "",
}
sender.setDefaults()
return sender
Expand Down Expand Up @@ -107,20 +109,28 @@ func (s *approveSender) resolve(ctx context.Context) (opResubmitted bool, err er
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return false, resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
// We resubmitted something - translate the status code to 200 (true return)
s.approval.TX.ID = idemErr.ExistingTXID
s.approval.TX.Type = core.TransactionTypeTokenApproval
return true, nil
}
}
return false, err
if !resubmitWholeTX {
return false, err
}
}
s.approval.TX.ID = txid
s.approval.TX.Type = core.TransactionTypeTokenApproval
Expand Down Expand Up @@ -194,7 +204,7 @@ func (s *approveSender) sendInternal(ctx context.Context, method sendMethod) (er
}
}

_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval))
_, err = s.mgr.operations.RunOperation(ctx, opApproval(op, pool, &s.approval.TokenApproval), s.idempotentSubmit)
return err
}

Expand Down
Loading

0 comments on commit b3443ec

Please sign in to comment.