Skip to content

Commit

Permalink
Handle the case where we fail before writing the operations, but afte…
Browse files Browse the repository at this point in the history
…r creating the TX

Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Sep 27, 2023
1 parent cbb093e commit 3f2a21f
Show file tree
Hide file tree
Showing 13 changed files with 328 additions and 65 deletions.
14 changes: 11 additions & 3 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,28 @@ func (s *approveSender) resolve(ctx context.Context) (opResubmitted bool, err er
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return false, resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
// We resubmitted something - translate the status code to 200 (true return)
s.approval.TX.ID = idemErr.ExistingTXID
s.approval.TX.Type = core.TransactionTypeTokenApproval
return true, nil
}
}
return false, err
if !resubmitWholeTX {
return false, err
}
}
s.approval.TX.ID = txid
s.approval.TX.Type = core.TransactionTypeTokenApproval
Expand Down
67 changes: 64 additions & 3 deletions internal/assets/token_approval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,68 @@ func TestApprovalIdempotentOperationResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return([]*core.Operation{op}, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1, []*core.Operation{op}, nil)

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.TokenApproval(context.Background(), approval, false)
assert.NoError(t, err)

mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestApprovalIdempotentOperationResubmitAll(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
var id = fftypes.NewUUID()

approval := &core.TokenApprovalInput{
TokenApproval: core.TokenApproval{
Approved: true,
Operator: "operator",
Key: "key",
},
IdempotencyKey: "idem1",
}

pool := &core.TokenPool{
Connector: "magic-tokens",
Active: true,
}

mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
mdi := am.database.(*databasemocks.Plugin)
mim := am.identity.(*identitymanagermocks.Manager)

fb := database.TokenPoolQueryFactory.NewFilter(context.Background())
f := fb.And()
f.Limit(1).Count(true)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(0, nil, nil)
mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(pool, nil)
mom.On("AddOrReuseOperation", context.Background(), mock.Anything).Return(nil)
mom.On("RunOperation", context.Background(), mock.Anything, true).Return(nil, nil)

tokenPools := []*core.TokenPool{
{
Name: "pool1",
Locator: "F1",
Connector: "magic-tokens",
Active: true,
},
}
totalCount := int64(1)
filterResult := &ffapi.FilterResult{
TotalCount: &totalCount,
}
mim.On("ResolveInputSigningKey", context.Background(), "key", identity.KeyNormalizationBlockchainPlugin).Return("0x12345", nil)
mdi.On("GetTokenPools", context.Background(), "ns1", mock.MatchedBy((func(f ffapi.AndFilter) bool {
info, _ := f.Finalize()
return info.Count && info.Limit == 1
}))).Return(tokenPools, filterResult, nil)

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.TokenApproval(context.Background(), approval, false)
Expand Down Expand Up @@ -275,7 +336,7 @@ func TestApprovalIdempotentNoOperationToResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1 /* one total */, nil /* none to resubmit */, nil)

// If ResubmitOperations returns nil it's because there was no operation in initialized state, so we expect the regular 409 error back
_, err := am.TokenApproval(context.Background(), approval, false)
Expand Down Expand Up @@ -308,7 +369,7 @@ func TestApprovalIdempotentOperationErrorOnResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenApproval, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, fmt.Errorf("pop"))
mom.On("ResubmitOperations", context.Background(), id).Return(-1, nil, fmt.Errorf("pop"))

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.TokenApproval(context.Background(), approval, false)
Expand Down
15 changes: 12 additions & 3 deletions internal/assets/token_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,28 @@ func (am *assetManager) createTokenPoolInternal(ctx context.Context, pool *core.
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers.
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr = am.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
var total int
total, resubmitted, resubmitErr = am.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
return resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
pool.TX.ID = idemErr.ExistingTXID
pool.TX.Type = core.TransactionTypeTokenPool
err = nil
}
}
return err
if !resubmitWholeTX {
return err
}
}

pool.TX.ID = txid
Expand Down
44 changes: 41 additions & 3 deletions internal/assets/token_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,45 @@ func TestCreateTokenPoolIdempotentResubmit(t *testing.T) {
Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return([]*core.Operation{op}, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1, []*core.Operation{op}, nil)

_, err := am.CreateTokenPool(context.Background(), pool, false)

// SubmitNewTransction returned 409 idempotency clash, ResubmitOperations returned that it resubmitted an operation. Shouldn't
// see the original 409 Conflict error
assert.NoError(t, err)

mdi.AssertExpectations(t)
mim.AssertExpectations(t)
mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestCreateTokenPoolIdempotentResubmitAll(t *testing.T) {
am, cancel := newTestAssets(t)
defer cancel()
var id = fftypes.NewUUID()

pool := &core.TokenPoolInput{
TokenPool: core.TokenPool{
Name: "testpool",
},
IdempotencyKey: "idem1",
}

mdi := am.database.(*databasemocks.Plugin)
mim := am.identity.(*identitymanagermocks.Manager)
mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
mdi.On("GetTokenPool", context.Background(), "ns1", "testpool").Return(nil, nil)
mim.On("ResolveInputSigningKey", context.Background(), "", identity.KeyNormalizationBlockchainPlugin).Return("resolved-key", nil)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenPool, core.IdempotencyKey("idem1")).
Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(0, nil, nil)
mom.On("AddOrReuseOperation", context.Background(), mock.Anything).Return(nil)
mom.On("RunOperation", context.Background(), mock.Anything, true).Return(nil, nil)

_, err := am.CreateTokenPool(context.Background(), pool, false)

Expand Down Expand Up @@ -186,7 +224,7 @@ func TestCreateTokenPoolIdempotentNoOperationToResubmit(t *testing.T) {
Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1 /* total */, nil /* to resubmit */, nil)

_, err := am.CreateTokenPool(context.Background(), pool, false)

Expand Down Expand Up @@ -223,7 +261,7 @@ func TestCreateTokenPoolIdempotentErrorOnOperationResubmit(t *testing.T) {
Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, fmt.Errorf("pop"))
mom.On("ResubmitOperations", context.Background(), id).Return(-1, nil, fmt.Errorf("pop"))

_, err := am.CreateTokenPool(context.Background(), pool, false)

Expand Down
14 changes: 11 additions & 3 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,29 @@ func (s *transferSender) resolve(ctx context.Context) (opResubmitted bool, err e
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers. Note that we'll return the result of resubmitting the operation, not a 409 Conflict error
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := s.mgr.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
if resubmitErr != nil {
// Error doing resubmit, return the new error
err = resubmitErr
}
if len(resubmitted) > 0 {
if total == 0 {
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
} else if len(resubmitted) > 0 {
// We resubmitted something - translate the status code to 200 (true return)
s.transfer.TX.ID = idemErr.ExistingTXID
s.transfer.TX.Type = core.TransactionTypeTokenTransfer
return true, nil
}

}
return false, err
if !resubmitWholeTX {
return false, err
}
}
s.transfer.TX.ID = txid
s.transfer.TX.Type = core.TransactionTypeTokenTransfer
Expand Down
45 changes: 42 additions & 3 deletions internal/assets/token_transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,46 @@ func TestMintTokensIdempotentResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenTransfer, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return([]*core.Operation{op}, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1, []*core.Operation{op}, nil)

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.MintTokens(context.Background(), mint, false)
assert.NoError(t, err)

mth.AssertExpectations(t)
mom.AssertExpectations(t)
}

func TestMintTokensIdempotentResubmitAll(t *testing.T) {
am, cancel := newTestAssetsWithMetrics(t)
defer cancel()
var id = fftypes.NewUUID()

mint := &core.TokenTransferInput{
TokenTransfer: core.TokenTransfer{
Amount: *fftypes.NewFFBigInt(5),
},
Pool: "pool1",
IdempotencyKey: "idem1",
}
pool := &core.TokenPool{
Name: "pool1",
Connector: "magic-tokens",
Active: true,
}

mdi := am.database.(*databasemocks.Plugin)
mim := am.identity.(*identitymanagermocks.Manager)
mth := am.txHelper.(*txcommonmocks.Helper)
mom := am.operations.(*operationmocks.Manager)
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenTransfer, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(0, nil, nil)
mim.On("ResolveInputSigningKey", context.Background(), "", identity.KeyNormalizationBlockchainPlugin).Return("0x12345", nil)
mdi.On("GetTokenPool", context.Background(), "ns1", "pool1").Return(pool, nil)
mom.On("AddOrReuseOperation", context.Background(), mock.Anything).Return(nil)
mom.On("RunOperation", context.Background(), mock.Anything, true).Return(nil, nil)

// If ResubmitOperations returns an operation it's because it found one to resubmit, so we return 2xx not 409, and don't expect an error
_, err := am.MintTokens(context.Background(), mint, false)
Expand Down Expand Up @@ -160,7 +199,7 @@ func TestMintTokensIdempotentNoOperationToResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenTransfer, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, nil)
mom.On("ResubmitOperations", context.Background(), id).Return(1 /* total */, nil /* to resumit */, nil)

// If ResubmitOperations returns nil it's because there was no operation in initialized state, so we expect the regular 409 error back
_, err := am.MintTokens(context.Background(), mint, false)
Expand Down Expand Up @@ -189,7 +228,7 @@ func TestMintTokensIdempotentErrorOnResubmit(t *testing.T) {
mth.On("SubmitNewTransaction", context.Background(), core.TransactionTypeTokenTransfer, core.IdempotencyKey("idem1")).Return(id, &sqlcommon.IdempotencyError{
ExistingTXID: id,
OriginalError: i18n.NewError(context.Background(), coremsgs.MsgIdempotencyKeyDuplicateTransaction, "idem1", id)})
mom.On("ResubmitOperations", context.Background(), id).Return(nil, fmt.Errorf("pop"))
mom.On("ResubmitOperations", context.Background(), id).Return(-1, nil, fmt.Errorf("pop"))

// If ResubmitOperations returns nil it's because there was no operation in initialized state, so we expect the regular 409 error back
_, err := am.MintTokens(context.Background(), mint, false)
Expand Down
34 changes: 26 additions & 8 deletions internal/broadcast/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,27 @@ func (bm *broadcastManager) PublishDataValue(ctx context.Context, id string, ide
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := bm.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := bm.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)

if resubmitErr != nil {
switch {
case resubmitErr != nil:
// Error doing resubmit, return the new error
err = resubmitErr
} else if len(resubmitted) > 0 {
case total == 0:
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
case len(resubmitted) > 0:
// We successfully resubmitted an initialized operation, return 2xx not 409
err = nil
}
}
return d, err
if !resubmitWholeTX {
return d, err
}
}

op := core.NewOperation(
Expand Down Expand Up @@ -275,18 +284,27 @@ func (bm *broadcastManager) PublishDataBlob(ctx context.Context, id string, idem
if err != nil {
// Check if we've clashed on idempotency key. There might be operations still in "Initialized" state that need
// submitting to their handlers
resubmitWholeTX := false
if idemErr, ok := err.(*sqlcommon.IdempotencyError); ok {
resubmitted, resubmitErr := bm.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)
total, resubmitted, resubmitErr := bm.operations.ResubmitOperations(ctx, idemErr.ExistingTXID)

if resubmitErr != nil {
switch {
case resubmitErr != nil:
// Error doing resubmit, return the new error
err = resubmitErr
} else if len(resubmitted) > 0 {
case total == 0:
// We didn't do anything last time - just start again
txid = idemErr.ExistingTXID
resubmitWholeTX = true
err = nil
case len(resubmitted) > 0:
// We successfully resubmitted an initialized operation, return 2xx not 409
err = nil
}
}
return d, err
if !resubmitWholeTX {
return d, err
}
}

if err = bm.uploadDataBlob(ctx, txid, d, idempotencyKey != ""); err != nil {
Expand Down
Loading

0 comments on commit 3f2a21f

Please sign in to comment.