Skip to content

Commit

Permalink
Filter receipt events by namespace when possible
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Richardson <[email protected]>
  • Loading branch information
awrichar committed Nov 26, 2024
1 parent 9a94dc1 commit 8131dad
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 23 deletions.
14 changes: 11 additions & 3 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -392,9 +392,17 @@ func (s *subscriptions) GetSubscription(subID string) *SubscriptionInfo {
}

// Common function for handling receipts from blockchain connectors.
func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error {
func HandleReceipt(ctx context.Context, namespace string, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error {
l := log.L(ctx)

if namespace != "" {
opNamespace, _, _ := core.ParseNamespacedOpID(ctx, reply.Headers.ReceiptID)
if opNamespace != namespace {
l.Debugf("Ignoring operation update from other namespace: request=%s tx=%s message=%s", reply.Headers.ReceiptID, reply.TxHash, reply.Message)
return nil
}
}

if reply.Headers.ReceiptID == "" || reply.Headers.ReplyType == "" {
return fmt.Errorf("reply cannot be processed - missing fields: %+v", reply)
}
Expand All @@ -409,7 +417,7 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece
updateType = core.OpStatusFailed
}

// Slightly upgly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires
// Slightly ugly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires
var output fftypes.JSONObject
obj, err := json.Marshal(reply)
if err != nil {
Expand Down
21 changes: 15 additions & 6 deletions internal/blockchain/common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -342,15 +342,15 @@ func TestGoodSuccessReceipt(t *testing.T) {
cb.SetHandler("ns1", mcb)
mcb.On("OperationUpdate", "ns1", mock.Anything).Return()

err := HandleReceipt(context.Background(), nil, &reply, cb)
err := HandleReceipt(context.Background(), "", nil, &reply, cb)
assert.NoError(t, err)

reply.Headers.ReplyType = "TransactionUpdate"
err = HandleReceipt(context.Background(), nil, &reply, cb)
err = HandleReceipt(context.Background(), "", nil, &reply, cb)
assert.NoError(t, err)

reply.Headers.ReplyType = "TransactionFailed"
err = HandleReceipt(context.Background(), nil, &reply, cb)
err = HandleReceipt(context.Background(), "", nil, &reply, cb)
assert.NoError(t, err)
}

Expand All @@ -365,7 +365,7 @@ func TestReceiptMarshallingError(t *testing.T) {
cb.SetHandler("ns1", mcb)
mcb.On("OperationUpdate", "ns1", mock.Anything).Return()

err := HandleReceipt(context.Background(), nil, &reply, cb)
err := HandleReceipt(context.Background(), "", nil, &reply, cb)
assert.Error(t, err)
assert.Regexp(t, ".*[^n]marshalling error.*", err)
}
Expand All @@ -384,10 +384,19 @@ func TestBadReceipt(t *testing.T) {
data := fftypes.JSONAnyPtr(`{}`)
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
err = HandleReceipt(context.Background(), nil, &reply, nil)
err = HandleReceipt(context.Background(), "", nil, &reply, nil)
assert.Error(t, err)
}

func TestWrongNamespaceReceipt(t *testing.T) {
var reply BlockchainReceiptNotification
data := fftypes.JSONAnyPtr(`{}`)
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
err = HandleReceipt(context.Background(), "wrong", nil, &reply, nil)
assert.NoError(t, err)
}

func TestErrorWrappingConflict(t *testing.T) {
ctx := context.Background()
res := &resty.Response{
Expand Down
4 changes: 2 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (e *Ethereum) eventLoop(namespace string, wsconn wsclient.WSClient, closed
if !isBatch {
var receipt common.BlockchainReceiptNotification
_ = json.Unmarshal(msgBytes, &receipt)
err := common.HandleReceipt(ctx, e, &receipt, e.callbacks)
err := common.HandleReceipt(ctx, namespace, e, &receipt, e.callbacks)
if err != nil {
l.Errorf("Failed to process receipt: %+v", msgTyped)
}
Expand Down Expand Up @@ -1223,7 +1223,7 @@ func (e *Ethereum) GetTransactionStatus(ctx context.Context, operation *core.Ope
TxHash: statusResponse.GetString("transactionHash"),
Message: statusResponse.GetString("errorMessage"),
ProtocolID: receiptInfo.GetString("protocolId")}
err := common.HandleReceipt(ctx, e, receipt, e.callbacks)
err := common.HandleReceipt(ctx, operation.Namespace, e, receipt, e.callbacks)
if err != nil {
log.L(ctx).Warnf("Failed to handle receipt")
}
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)

common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand Down Expand Up @@ -1922,7 +1922,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
assert.NoError(t, err)
expectedReceiptId := "ns1:" + operationID.String()
assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand Down Expand Up @@ -1987,7 +1987,7 @@ func TestHandleMsgBatchBadData(t *testing.T) {
data := fftypes.JSONAnyPtr(`{}`)
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
}

func TestFormatNil(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (f *Fabric) eventLoop(namespace string, wsconn wsclient.WSClient, closed ch
var receipt common.BlockchainReceiptNotification
_ = json.Unmarshal(msgBytes, &receipt)

err := common.HandleReceipt(ctx, f, &receipt, f.callbacks)
err := common.HandleReceipt(ctx, namespace, f, &receipt, f.callbacks)
if err != nil {
l.Errorf("Failed to process receipt: %+v", msgTyped)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand All @@ -1836,7 +1836,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) {
data := []byte(`{}`)
err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
}

func TestHandleReceiptFailedTx(t *testing.T) {
Expand Down Expand Up @@ -1876,7 +1876,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/blockchain/tezos/tezos.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ func (t *Tezos) GetTransactionStatus(ctx context.Context, operation *core.Operat
TxHash: statusResponse.GetString("transactionHash"),
Message: statusResponse.GetString("errorMessage"),
ProtocolID: receiptInfo.GetString("protocolId")}
err := common.HandleReceipt(ctx, t, receipt, t.callbacks)
err := common.HandleReceipt(ctx, operation.Namespace, t, receipt, t.callbacks)
if err != nil {
log.L(ctx).Warnf("Failed to handle receipt")
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (t *Tezos) eventLoop() {
var receipt common.BlockchainReceiptNotification
_ = json.Unmarshal(msgBytes, &receipt)

err := common.HandleReceipt(ctx, t, &receipt, t.callbacks)
err := common.HandleReceipt(ctx, "", t, &receipt, t.callbacks) // TODO: should be specific to a namespace
if err != nil {
l.Errorf("Failed to process receipt: %+v", msgTyped)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/blockchain/tezos/tezos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)

common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)

tm.AssertExpectations(t)
}
Expand Down Expand Up @@ -780,7 +780,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
assert.NoError(t, err)
expectedReceiptId := "ns1:" + operationID.String()
assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId)
common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)

tm.AssertExpectations(t)
}
Expand All @@ -797,7 +797,7 @@ func TestHandleMsgBatchBadData(t *testing.T) {
data := fftypes.JSONAnyPtr(`{}`)
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)
}

func TestAddSubscription(t *testing.T) {
Expand Down

0 comments on commit 8131dad

Please sign in to comment.