Skip to content

Commit

Permalink
Create standard FireFly receipt structure for blockchain connectors t…
Browse files Browse the repository at this point in the history
…o follow and make handling of receipt common code.

Signed-off-by: Matthew Whitehead <[email protected]>
  • Loading branch information
matthew1001 committed Dec 12, 2022
1 parent 9305de4 commit ea28a80
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 87 deletions.
46 changes: 46 additions & 0 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
"context"
"encoding/hex"
"encoding/json"
"strings"

"github.com/hyperledger/firefly-common/pkg/fftypes"
Expand Down Expand Up @@ -70,6 +71,18 @@ type SubscriptionInfo struct {
Extra interface{}
}

type BlockchainReceiptHeaders struct {
ReceiptID string `json:"requestId,omitempty"`
ReplyType string `json:"type,omitempty"`
}

type BlockchainReceiptNotification struct {
Headers BlockchainReceiptHeaders
TxHash string `json:"transactionHash,omitempty"`
Message string `json:"errorMessage,omitempty"`
ProtocolID string `json:"protocolId,omitempty"`
}

func NewBlockchainCallbacks() BlockchainCallbacks {
return &callbacks{
handlers: make(map[string]blockchain.Callbacks),
Expand Down Expand Up @@ -275,3 +288,36 @@ func (s *subscriptions) RemoveSubscription(ctx context.Context, subID string) {
func (s *subscriptions) GetSubscription(subID string) *SubscriptionInfo {
return s.subs[subID]
}

// Common function for handling receipts from blockchain connectors.
func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) {
l := log.L(ctx)

if reply.Headers.ReceiptID == "" || reply.Headers.ReplyType == "" {
l.Errorf("Reply cannot be processed - missing fields: %+v", reply)
return
}
var updateType core.OpStatus
switch reply.Headers.ReplyType {
case "TransactionSuccess":
updateType = core.OpStatusSucceeded
case "TransactionUpdate":
updateType = core.OpStatusPending
default:
updateType = core.OpStatusFailed
}

// Slightly upgly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires
var output fftypes.JSONObject
obj, err := json.Marshal(reply)
if err != nil {
l.Errorf("Reply cannot be processed - conversion failed", reply)
}
err = json.Unmarshal(obj, &output)
if err != nil {
l.Errorf("Reply cannot be processed - conversion failed", reply)
}

l.Infof("Received operation update: status=%s request=%s tx=%s message=%s", updateType, reply.Headers.ReceiptID, reply.TxHash, reply.Message)
callbacks.OperationUpdate(ctx, plugin, reply.Headers.ReceiptID, updateType, reply.TxHash, reply.Message, output)
}
75 changes: 24 additions & 51 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,8 @@ const (
broadcastBatchEventSignature = "BatchPin(address,uint256,string,bytes32,bytes32,string,bytes32[])"
)

// Headers that allow us to construct the equivalent of a web socket notification
type wsrHeaders struct {
RequestID string `json:"requestId"`
Type string `json:"type"`
}

const (
ethTxStatusPending string = "Pending"
ethTxStatusSucceeded string = "Succeeded"
ethTxStatusFailed string = "Failed"
)

const (
ethTransactionUpdateSuccess string = "TransactionSuccess"
ethTransactionUpdateFailure string = "TransactionFailure"
ethTxStatusPending string = "Pending"
)

type Ethereum struct {
Expand Down Expand Up @@ -362,31 +349,6 @@ func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSON
return err
}

func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) {
l := log.L(ctx)

headers := reply.GetObject("headers")
requestID := headers.GetString("requestId")
replyType := headers.GetString("type")
txHash := reply.GetString("transactionHash")
message := reply.GetString("errorMessage")
if requestID == "" || replyType == "" {
l.Errorf("Reply cannot be processed - missing fields: %+v", reply)
return
}
var updateType core.OpStatus
switch replyType {
case "TransactionSuccess":
updateType = core.OpStatusSucceeded
case "TransactionUpdate":
updateType = core.OpStatusPending
default:
updateType = core.OpStatusFailed
}
l.Infof("Received operation update: status=%s request=%s tx=%s message=%s", updateType, requestID, txHash, message)
e.callbacks.OperationUpdate(ctx, e, requestID, updateType, txHash, message, reply)
}

func (e *Ethereum) buildEventLocationString(msgJSON fftypes.JSONObject) string {
return fmt.Sprintf("address=%s", msgJSON.GetString("address"))
}
Expand Down Expand Up @@ -496,7 +458,12 @@ func (e *Ethereum) eventLoop() {
}
}
if !isBatch {
e.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
switch receiptObj := msgParsed.(type) {
case common.BlockchainReceiptNotification:
common.HandleReceipt(ctx, e, &receiptObj, e.callbacks)
default:
l.Errorf("Message cannot be parsed as a receipt: %s\n%s", err, string(msgBytes))
}
}
default:
l.Errorf("Message unexpected: %+v", msgTyped)
Expand Down Expand Up @@ -982,20 +949,26 @@ func (e *Ethereum) GetTransactionStatus(ctx context.Context, operation *core.Ope
if err != nil || !res.IsSuccess() {
return nil, wrapError(ctx, &resErr, res, err)
}

headers := statusResponse.GetObject("headers")
txStatus := statusResponse.GetString("status")

// If the status has changed, mock up a WSR as if we'd received a web socket update
if operation.Status == core.OpStatusPending && txStatus != ethTxStatusPending {
var headers wsrHeaders
headers.RequestID = statusResponse.GetString("id")
switch txStatus {
case ethTxStatusSucceeded:
headers.Type = ethTransactionUpdateSuccess
case ethTxStatusFailed:
headers.Type = ethTransactionUpdateFailure
if txStatus != "" {
// If the status has changed, mock up blockchain receipt as if we'd received it
// as a web socket notification
if operation.Status == core.OpStatusPending && txStatus != ethTxStatusPending {
receipt := &common.BlockchainReceiptNotification{
Headers: common.BlockchainReceiptHeaders{
ReceiptID: headers.GetString("id"),
ReplyType: headers.GetString("type")},
TxHash: statusResponse.GetString("transactionId"),
Message: statusResponse.GetString("errorMessage")}
common.HandleReceipt(ctx, e, receipt, e.callbacks)
}
statusResponse["headers"] = headers
e.handleReceipt(ctx, statusResponse)
} else {
// Don't expect to get here so issue a warning
log.L(ctx).Warnf("Transaction status didn't include txStatus information")
}

return statusResponse, nil
}
13 changes: 7 additions & 6 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
}
e.SetOperationHandler("ns1", em)

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
operationID := fftypes.NewUUID()
data := fftypes.JSONAnyPtr(`{
"_id": "4373614c-e0f7-47b0-640e-7eacec417a9e",
Expand Down Expand Up @@ -1492,7 +1492,8 @@ func TestHandleReceiptTXSuccess(t *testing.T) {

err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)

common.HandleReceipt(context.Background(), e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand All @@ -1508,7 +1509,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
}
e.SetOperationHandler("ns1", em)

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
operationID := fftypes.NewUUID()
data := fftypes.JSONAnyPtr(`{
"created": "2022-08-03T18:55:42.671166Z",
Expand Down Expand Up @@ -1577,7 +1578,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {

err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand Down Expand Up @@ -1637,11 +1638,11 @@ func TestHandleMsgBatchBadData(t *testing.T) {
wsconn: wsm,
}

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
data := fftypes.JSONAnyPtr(`{}`)
err := json.Unmarshal(data.Bytes(), &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
}

func TestFormatNil(t *testing.T) {
Expand Down
24 changes: 2 additions & 22 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,26 +369,6 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb
})
}

func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) {
l := log.L(ctx)

headers := reply.GetObject("headers")
requestID := headers.GetString("requestId")
replyType := headers.GetString("type")
txHash := reply.GetString("transactionId")
message := reply.GetString("errorMessage")
if requestID == "" || replyType == "" {
l.Errorf("Reply cannot be processed: %+v", reply)
return
}
updateType := core.OpStatusSucceeded
if replyType != "TransactionSuccess" {
updateType = core.OpStatusFailed
}
l.Infof("Received operation update: status=%s request=%s tx=%s message=%s", updateType, requestID, txHash, message)
f.callbacks.OperationUpdate(ctx, f, requestID, updateType, txHash, message, reply)
}

func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, location *fftypes.JSONAny, firstEvent string) (string, error) {
fabricOnChainLocation, err := parseContractLocation(ctx, location)
if err != nil {
Expand Down Expand Up @@ -497,8 +477,8 @@ func (f *Fabric) eventLoop() {
if err == nil {
err = f.wsconn.Send(ctx, ack)
}
case map[string]interface{}:
f.handleReceipt(ctx, fftypes.JSONObject(msgTyped))
case common.BlockchainReceiptNotification:
common.HandleReceipt(ctx, f, &msgTyped, f.callbacks)
default:
l.Errorf("Message unexpected: %+v", msgTyped)
continue
Expand Down
16 changes: 8 additions & 8 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
}
e.SetOperationHandler("ns1", em)

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
operationID := fftypes.NewUUID()
data := []byte(`{
"_id": "748e7587-9e72-4244-7351-808f69b88291",
Expand All @@ -1322,7 +1322,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
"timeReceived": "2021-08-27T03:04:34.199742Z",
"type": "TransactionSuccess"
},
"transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2",
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2",
"receivedAt": 1630033474675
}`)

Expand All @@ -1335,7 +1335,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand All @@ -1352,11 +1352,11 @@ func TestHandleReceiptNoRequestID(t *testing.T) {
}
e.SetHandler("ns1", em)

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
data := []byte(`{}`)
err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
}

func TestHandleReceiptFailedTx(t *testing.T) {
Expand All @@ -1371,7 +1371,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
}
e.SetOperationHandler("ns1", em)

var reply fftypes.JSONObject
var reply common.BlockchainReceiptNotification
operationID := fftypes.NewUUID()
data := []byte(`{
"_id": "748e7587-9e72-4244-7351-808f69b88291",
Expand All @@ -1384,7 +1384,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
"type": "TransactionFailure"
},
"receivedAt": 1630033474675,
"transactionId": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
}`)

em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
Expand All @@ -1396,7 +1396,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {

err := json.Unmarshal(data, &reply)
assert.NoError(t, err)
e.handleReceipt(context.Background(), reply)
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)

em.AssertExpectations(t)
}
Expand Down

0 comments on commit ea28a80

Please sign in to comment.