Skip to content

Commit

Permalink
Merge pull request #448 from kaleido-io/blockchain-txids
Browse files Browse the repository at this point in the history
Add Blockchain Transaction IDs to FireFly Transaction objects and add blockchain IDs to events
  • Loading branch information
awrichar authored Jan 27, 2022
2 parents e3993c3 + 2636cfb commit 5526967
Show file tree
Hide file tree
Showing 51 changed files with 705 additions and 454 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ ALTER TABLE transactions ADD COLUMN info BYTEA;

CREATE INDEX transactions_protocol_id ON transactions(protocol_id);
CREATE INDEX transactions_ref ON transactions(ref);

DROP INDEX transactions_blockchain_ids;
ALTER TABLE transactions DROP COLUMN blockchain_ids;
COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ ALTER TABLE transactions DROP COLUMN signer;
ALTER TABLE transactions DROP COLUMN hash;
ALTER TABLE transactions DROP COLUMN protocol_id;
ALTER TABLE transactions DROP COLUMN info;

ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024);
CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids);
COMMIT;
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ALTER TABLE transactions ADD COLUMN info BYTEA;

CREATE INDEX transactions_protocol_id ON transactions(protocol_id);
CREATE INDEX transactions_ref ON transactions(ref);

DROP INDEX transactions_blockchain_ids;
ALTER TABLE transactions DROP COLUMN blockchain_ids;
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ ALTER TABLE transactions DROP COLUMN signer;
ALTER TABLE transactions DROP COLUMN hash;
ALTER TABLE transactions DROP COLUMN protocol_id;
ALTER TABLE transactions DROP COLUMN info;

ALTER TABLE transactions ADD COLUMN blockchain_ids VARCHAR(1024);
CREATE INDEX transactions_blockchain_ids ON transactions(blockchain_ids);
22 changes: 22 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5078,6 +5078,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down Expand Up @@ -9837,6 +9841,11 @@ paths:
schema:
default: 120s
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: blockchainids
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: created
Expand Down Expand Up @@ -9902,6 +9911,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down Expand Up @@ -9945,6 +9958,11 @@ paths:
schema:
default: 120s
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: blockchainids
schema:
type: string
- description: 'Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^'
in: query
name: created
Expand Down Expand Up @@ -10010,6 +10028,10 @@ paths:
application/json:
schema:
properties:
blockchainIds:
items:
type: string
type: array
created: {}
id: {}
namespace:
Expand Down
2 changes: 1 addition & 1 deletion internal/batch/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func TestCalcPinsFail(t *testing.T) {
Messages: []*fftypes.Message{
{Header: fftypes.MessageHeader{
Group: gid,
Topics: fftypes.FFNameArray{"topic1"},
Topics: fftypes.FFStringArray{"topic1"},
}},
},
},
Expand Down
40 changes: 23 additions & 17 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ func ethHexFormatB32(b *fftypes.Bytes32) string {

func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
sBlockNumber := msgJSON.GetString("blockNumber")
sTransactionIndex := msgJSON.GetString("transactionIndex")
sTransactionHash := msgJSON.GetString("transactionHash")
blockNumber := msgJSON.GetInt64("blockNumber")
txIndex := msgJSON.GetInt64("transactionIndex")
logIndex := msgJSON.GetInt64("logIndex")
dataJSON := msgJSON.GetObject("data")
authorAddress := dataJSON.GetString("author")
ns := dataJSON.GetString("namespace")
Expand All @@ -205,7 +207,6 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
}

if sBlockNumber == "" ||
sTransactionIndex == "" ||
sTransactionHash == "" ||
authorAddress == "" ||
sUUIDs == "" ||
Expand Down Expand Up @@ -257,12 +258,13 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON
BatchPayloadRef: sPayloadRef,
Contexts: contexts,
Event: blockchain.Event{
Source: e.Name(),
Name: "BatchPin",
ProtocolID: sTransactionHash,
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
BlockchainTXID: sTransactionHash,
Source: e.Name(),
Name: "BatchPin",
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
},
}

Expand All @@ -272,6 +274,9 @@ func (e *Ethereum) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSON

func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONObject) (err error) {
sTransactionHash := msgJSON.GetString("transactionHash")
blockNumber := msgJSON.GetInt64("blockNumber")
txIndex := msgJSON.GetInt64("transactionIndex")
logIndex := msgJSON.GetInt64("logIndex")
sub := msgJSON.GetString("subId")
signature := msgJSON.GetString("signature")
dataJSON := msgJSON.GetObject("data")
Expand All @@ -284,19 +289,20 @@ func (e *Ethereum) handleContractEvent(ctx context.Context, msgJSON fftypes.JSON
}
delete(msgJSON, "data")

event := &blockchain.ContractEvent{
event := &blockchain.EventWithSubscription{
Subscription: sub,
Event: blockchain.Event{
Source: e.Name(),
Name: name,
ProtocolID: sTransactionHash,
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
BlockchainTXID: sTransactionHash,
Source: e.Name(),
Name: name,
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
Output: dataJSON,
Info: msgJSON,
Timestamp: timestamp,
},
}

return e.callbacks.ContractEvent(event)
return e.callbacks.BlockchainEvent(event)
}

func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error {
Expand All @@ -321,7 +327,7 @@ func (e *Ethereum) handleReceipt(ctx context.Context, reply fftypes.JSONObject)
updateType = fftypes.OpStatusFailed
}
l.Infof("Ethconnect '%s' reply: request=%s tx=%s message=%s", replyType, requestID, txHash, message)
return e.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply)
return e.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply)
}

func (e *Ethereum) handleMessageBatch(ctx context.Context, messages []interface{}) error {
Expand Down
14 changes: 10 additions & 4 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
em.On("BlockchainOpUpdate",
operationID,
fftypes.OpStatusSucceeded,
"0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8",
"",
mock.Anything).Return(nil)

Expand Down Expand Up @@ -1022,6 +1023,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
txsu := em.On("BlockchainOpUpdate",
operationID,
fftypes.OpStatusFailed,
"",
"Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument",
mock.Anything).Return(fmt.Errorf("Shutdown"))
done := make(chan struct{})
Expand Down Expand Up @@ -1292,15 +1294,19 @@ func TestHandleMessageContractEvent(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(nil)
em.On("BlockchainEvent", mock.MatchedBy(func(e *blockchain.EventWithSubscription) bool {
assert.Equal(t, "0xc26df2bf1a733e9249372d61eb11bd8662d26c8129df76890b1beb2f6fa72628", e.BlockchainTXID)
assert.Equal(t, "000000038011/000000/000050", e.Event.ProtocolID)
return true
})).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
assert.NoError(t, err)
err = e.handleMessageBatch(context.Background(), events)
assert.NoError(t, err)

ev := em.Calls[0].Arguments[0].(*blockchain.ContractEvent)
ev := em.Calls[0].Arguments[0].(*blockchain.EventWithSubscription)
assert.Equal(t, "sub2", ev.Subscription)
assert.Equal(t, "Changed", ev.Event.Name)

Expand Down Expand Up @@ -1351,7 +1357,7 @@ func TestHandleMessageContractEventNoTimestamp(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(nil)
em.On("BlockchainEvent", mock.Anything).Return(nil)

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
Expand Down Expand Up @@ -1387,7 +1393,7 @@ func TestHandleMessageContractEventError(t *testing.T) {
ID: "sb-b5b97a4e-a317-4053-6400-1474650efcb5",
}

em.On("ContractEvent", mock.Anything).Return(fmt.Errorf("pop"))
em.On("BlockchainEvent", mock.Anything).Return(fmt.Errorf("pop"))

var events []interface{}
err := json.Unmarshal(data.Bytes(), &events)
Expand Down
2 changes: 1 addition & 1 deletion internal/blockchain/ethereum/ffi_param_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestSchemaTypeInvalidFFIType(t *testing.T) {

func TestSchemaTypeMissing(t *testing.T) {
_, err := NewTestSchema(`{}`)
assert.Regexp(t, "missing properties: 'type'", err)
assert.Regexp(t, "missing properties", err)
}

func TestSchemaDetailsTypeMissing(t *testing.T) {
Expand Down
40 changes: 24 additions & 16 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb
}

sTransactionHash := msgJSON.GetString("transactionId")
blockNumber := msgJSON.GetInt64("blockNumber")
transactionIndex := msgJSON.GetInt64("transactionIndex")
eventIndex := msgJSON.GetInt64("eventIndex")
signer := payload.GetString("signer")
ns := payload.GetString("namespace")
sUUIDs := payload.GetString("uuids")
Expand Down Expand Up @@ -315,12 +318,13 @@ func (f *Fabric) handleBatchPinEvent(ctx context.Context, msgJSON fftypes.JSONOb
BatchPayloadRef: sPayloadRef,
Contexts: contexts,
Event: blockchain.Event{
Source: f.Name(),
Name: "BatchPin",
ProtocolID: sTransactionHash,
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
BlockchainTXID: sTransactionHash,
Source: f.Name(),
Name: "BatchPin",
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex),
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
},
}

Expand All @@ -337,6 +341,9 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb
delete(msgJSON, "payload")

sTransactionHash := msgJSON.GetString("transactionId")
blockNumber := msgJSON.GetInt64("blockNumber")
transactionIndex := msgJSON.GetInt64("transactionIndex")
eventIndex := msgJSON.GetInt64("eventIndex")
sub := msgJSON.GetString("subId")
name := msgJSON.GetString("eventName")
sTimestamp := msgJSON.GetString("timestamp")
Expand All @@ -346,19 +353,20 @@ func (f *Fabric) handleContractEvent(ctx context.Context, msgJSON fftypes.JSONOb
// Continue with zero timestamp
}

event := &blockchain.ContractEvent{
event := &blockchain.EventWithSubscription{
Subscription: sub,
Event: blockchain.Event{
Source: f.Name(),
Name: name,
ProtocolID: sTransactionHash,
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
BlockchainTXID: sTransactionHash,
Source: f.Name(),
Name: name,
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, transactionIndex, eventIndex),
Output: *payload,
Info: msgJSON,
Timestamp: fftypes.UnixTime(timestamp),
},
}

return f.callbacks.ContractEvent(event)
return f.callbacks.BlockchainEvent(event)
}

func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) error {
Expand All @@ -367,7 +375,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er
headers := reply.GetObject("headers")
requestID := headers.GetString("requestId")
replyType := headers.GetString("type")
txHash := reply.GetString("transactionHash")
txHash := reply.GetString("transactionId")
message := reply.GetString("errorMessage")
if requestID == "" || replyType == "" {
l.Errorf("Reply cannot be processed: %+v", reply)
Expand All @@ -383,7 +391,7 @@ func (f *Fabric) handleReceipt(ctx context.Context, reply fftypes.JSONObject) er
updateType = fftypes.OpStatusFailed
}
l.Infof("Fabconnect '%s' reply tx=%s (request=%s) %s", replyType, txHash, requestID, message)
return f.callbacks.BlockchainOpUpdate(operationID, updateType, message, reply)
return f.callbacks.BlockchainOpUpdate(operationID, updateType, txHash, message, reply)
}

func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) error {
Expand Down
Loading

0 comments on commit 5526967

Please sign in to comment.