From d68e71154b195d4bac20241efe2c4d9f67a20e20 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Wed, 29 Dec 2021 14:04:46 -0500 Subject: [PATCH 1/4] Add transaction index to the event Signed-off-by: Jim Zhang --- internal/events/api/event.go | 15 ++++++++------- internal/fabric/utils/blockdecoder.go | 13 +++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/internal/events/api/event.go b/internal/events/api/event.go index a6aba3c..5f08a36 100644 --- a/internal/events/api/event.go +++ b/internal/events/api/event.go @@ -61,11 +61,12 @@ func (info *SubscriptionInfo) GetID() string { } type EventEntry struct { - ChaincodeId string `json:"chaincodeId"` - BlockNumber uint64 `json:"blockNumber"` - TransactionId string `json:"transactionId"` - EventName string `json:"eventName"` - Payload interface{} `json:"payload"` - Timestamp uint64 `json:"timestamp,omitempty"` - SubID string `json:"subId"` + ChaincodeId string `json:"chaincodeId"` + BlockNumber uint64 `json:"blockNumber"` + TransactionId string `json:"transactionId"` + TransactionIndex int `json:"transactionIndex"` + EventName string `json:"eventName"` + Payload interface{} `json:"payload"` + Timestamp uint64 `json:"timestamp,omitempty"` + SubID string `json:"subId"` } diff --git a/internal/fabric/utils/blockdecoder.go b/internal/fabric/utils/blockdecoder.go index 6da9719..320c295 100644 --- a/internal/fabric/utils/blockdecoder.go +++ b/internal/fabric/utils/blockdecoder.go @@ -30,7 +30,7 @@ import ( func GetEvents(block *common.Block) []*api.EventEntry { events := []*api.EventEntry{} fb := toFilteredBlock(block) - for _, tx := range fb.GetFilteredTransactions() { + for idx, tx := range fb.GetFilteredTransactions() { actions := tx.GetTransactionActions() if actions != nil { ccActions := actions.GetChaincodeActions() @@ -38,11 +38,12 @@ func GetEvents(block *common.Block) []*api.EventEntry { for _, ccAction := range ccActions { event := ccAction.GetChaincodeEvent() eventEntry := api.EventEntry{ - ChaincodeId: event.ChaincodeId, - BlockNumber: fb.Number, - TransactionId: tx.Txid, - EventName: event.EventName, - Payload: event.Payload, + ChaincodeId: event.ChaincodeId, + BlockNumber: fb.Number, + TransactionId: tx.Txid, + TransactionIndex: idx, + EventName: event.EventName, + Payload: event.Payload, } events = append(events, &eventEntry) } From c52b369f2bf034118ebe0b47cb78a7d0a15589c4 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Wed, 12 Jan 2022 18:46:23 -0500 Subject: [PATCH 2/4] Add full block decoder Signed-off-by: Jim Zhang --- internal/events/api/event.go | 2 +- internal/events/eventstream_test.go | 120 ------ internal/events/submanager_test.go | 48 --- internal/events/subscription_test.go | 44 --- internal/events/test_helper.go | 236 ++++++++++++ internal/fabric/client/ledger.go | 8 +- internal/fabric/utils/block.go | 32 ++ internal/fabric/utils/blockdecoder.go | 405 ++++++++++++--------- internal/fabric/utils/blockdecoder_test.go | 100 +++++ internal/fabric/utils/rawblock.go | 124 +++++++ test/resources/block-1.bin | Bin 0 -> 4947 bytes test/resources/block-1.json | 156 ++++++++ test/resources/block-2.bin | Bin 0 -> 5356 bytes test/resources/block-2.json | 168 +++++++++ test/resources/genesis.block | Bin 0 -> 9282 bytes 15 files changed, 1050 insertions(+), 393 deletions(-) create mode 100644 internal/events/test_helper.go create mode 100644 internal/fabric/utils/block.go create mode 100644 internal/fabric/utils/blockdecoder_test.go create mode 100644 internal/fabric/utils/rawblock.go create mode 100644 test/resources/block-1.bin create mode 100644 test/resources/block-1.json create mode 100644 test/resources/block-2.bin create mode 100644 test/resources/block-2.json create mode 100644 test/resources/genesis.block diff --git a/internal/events/api/event.go b/internal/events/api/event.go index 5f08a36..77f5d8b 100644 --- a/internal/events/api/event.go +++ b/internal/events/api/event.go @@ -67,6 +67,6 @@ type EventEntry struct { TransactionIndex int `json:"transactionIndex"` EventName string `json:"eventName"` Payload interface{} `json:"payload"` - Timestamp uint64 `json:"timestamp,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` SubID string `json:"subId"` } diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 4322e13..8542265 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -17,19 +17,12 @@ package events import ( - "encoding/json" "fmt" - "net/http" - "net/http/httptest" "strings" "sync" "testing" "time" - "github.com/hyperledger/fabric-protos-go/common" - "github.com/hyperledger/fabric-protos-go/peer" - "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" - eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" "github.com/hyperledger/firefly-fabconnect/internal/conf" "github.com/hyperledger/firefly-fabconnect/internal/errors" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" @@ -40,119 +33,6 @@ import ( "github.com/stretchr/testify/mock" ) -func newTestStreamForBatching(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *httptest.Server, chan []*eventsapi.EventEntry) { - mux := http.NewServeMux() - eventStream := make(chan []*eventsapi.EventEntry) - count := 0 - mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { - var events []*eventsapi.EventEntry - _ = json.NewDecoder(req.Body).Decode(&events) - eventStream <- events - idx := count - if idx >= len(status) { - idx = len(status) - 1 - } - res.WriteHeader(status[idx]) - count++ - }) - svr := httptest.NewServer(mux) - if spec.Type == "" { - spec.Type = "webhook" - spec.Webhook.URL = svr.URL - spec.Webhook.Headers = map[string]string{"x-my-header": "my-value"} - } - sm := newTestSubscriptionManager() - sm.config.WebhooksAllowPrivateIPs = true - sm.config.PollingIntervalSec = 0 - if db != nil { - sm.db = db - } - mockstore, ok := sm.db.(*mockkvstore.KVStore) - if ok { - mockstore.On("Get", mock.Anything).Return([]byte(""), nil) - mockstore.On("Put", mock.Anything, mock.Anything).Return(nil) - } - - _ = sm.addStream(spec) - return sm, sm.streams[spec.ID], svr, eventStream -} - -func newTestStreamForWebSocket(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *mockWebSocket) { - sm := newTestSubscriptionManager() - sm.config.PollingIntervalSec = 0 - if db != nil { - sm.db = db - } - _ = sm.addStream(spec) - return sm, sm.streams[spec.ID], sm.wsChannels.(*mockWebSocket) -} - -func testEvent(subID string) *eventData { - entry := &eventsapi.EventEntry{ - SubID: subID, - } - return &eventData{ - event: entry, - batchComplete: func(*eventsapi.EventEntry) {}, - } -} - -func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { - rpc := &mockfabric.RPCClient{} - blockEventChan := make(chan *fab.BlockEvent) - ccEventChan := make(chan *fab.CCEvent) - var roBlockEventChan <-chan *fab.BlockEvent = blockEventChan - var roCCEventChan <-chan *fab.CCEvent = ccEventChan - res := &fab.BlockchainInfoResponse{ - BCI: &common.BlockchainInfo{ - Height: 10, - }, - } - rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil) - rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil) - rpc.On("Unregister", mock.Anything).Return() - - go func() { - if fromBlock == "0" { - blockEventChan <- &fab.BlockEvent{ - Block: constructBlock(1), - } - } - blockEventChan <- &fab.BlockEvent{ - Block: constructBlock(11), - } - if len(withReset) > 0 { - blockEventChan <- &fab.BlockEvent{ - Block: constructBlock(11), - } - } - }() - - return rpc -} - -func setupTestSubscription(sm *subscriptionMGR, stream *eventStream, subscriptionName, fromBlock string, withReset ...bool) *eventsapi.SubscriptionInfo { - rpc := mockRPCClient(fromBlock, withReset...) - sm.rpc = rpc - spec := &eventsapi.SubscriptionInfo{ - Name: subscriptionName, - Stream: stream.spec.ID, - } - if fromBlock != "" { - spec.FromBlock = fromBlock - } - _ = sm.addSubscription(spec) - - return spec -} - -func constructBlock(number uint64) *common.Block { - mockTx := eventmocks.NewTransactionWithCCEvent("testTxID", peer.TxValidationCode_VALID, "testChaincodeID", "testCCEventName", []byte("testPayload")) - mockBlock := eventmocks.NewBlock("testChannelID", mockTx) - mockBlock.Header.Number = number - return mockBlock -} - func TestConstructorNoSpec(t *testing.T) { assert := assert.New(t) _, err := newEventStream(newTestSubscriptionManager(), nil, nil) diff --git a/internal/events/submanager_test.go b/internal/events/submanager_test.go index 6fd3947..c444b21 100644 --- a/internal/events/submanager_test.go +++ b/internal/events/submanager_test.go @@ -17,65 +17,17 @@ package events import ( "io/ioutil" "net/http/httptest" - "os" "path" "testing" "time" - "github.com/hyperledger/firefly-fabconnect/internal/conf" "github.com/hyperledger/firefly-fabconnect/internal/events/api" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" "github.com/hyperledger/firefly-fabconnect/internal/kvstore" - mockkvstore "github.com/hyperledger/firefly-fabconnect/mocks/kvstore" "github.com/julienschmidt/httprouter" "github.com/stretchr/testify/assert" ) -type mockWebSocket struct { - capturedNamespace string - sender chan interface{} - broadcast chan interface{} - receiver chan error - closing chan struct{} -} - -func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) { - m.capturedNamespace = namespace - return m.sender, m.broadcast, m.receiver, m.closing -} - -func (m *mockWebSocket) SendReply(message interface{}) {} - -func tempdir(t *testing.T) string { - dir, _ := ioutil.TempDir("", "fly") - t.Logf("tmpdir/create: %s", dir) - return dir -} - -func cleanup(t *testing.T, dir string) { - t.Logf("tmpdir/cleanup: %s [dir]", dir) - os.RemoveAll(dir) -} - -func newMockWebSocket() *mockWebSocket { - return &mockWebSocket{ - sender: make(chan interface{}), - broadcast: make(chan interface{}), - receiver: make(chan error), - closing: make(chan struct{}), - } -} - -func newTestSubscriptionManager() *subscriptionMGR { - smconf := &conf.EventstreamConf{} - rpc := mockRPCClient("") - sm := NewSubscriptionManager(smconf, rpc, newMockWebSocket()).(*subscriptionMGR) - sm.db = &mockkvstore.KVStore{} - sm.config.WebhooksAllowPrivateIPs = true - sm.config.PollingIntervalSec = 0 - return sm -} - func TestInitLevelDBSuccess(t *testing.T) { assert := assert.New(t) dir := tempdir(t) diff --git a/internal/events/subscription_test.go b/internal/events/subscription_test.go index 95d2f7f..8d1b3f7 100644 --- a/internal/events/subscription_test.go +++ b/internal/events/subscription_test.go @@ -18,53 +18,9 @@ import ( "fmt" "testing" - "github.com/hyperledger/firefly-fabconnect/internal/conf" - eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" "github.com/stretchr/testify/assert" ) -type mockSubMgr struct { - stream *eventStream - subscription *subscription - err error - subscriptions []*subscription -} - -func (m *mockSubMgr) getConfig() *conf.EventstreamConf { - return &conf.EventstreamConf{} -} - -func (m *mockSubMgr) streamByID(string) (*eventStream, error) { - return m.stream, m.err -} - -func (m *mockSubMgr) subscriptionByID(string) (*subscription, error) { - return m.subscription, m.err -} - -func (m *mockSubMgr) subscriptionsForStream(string) []*subscription { - return m.subscriptions -} - -func (m *mockSubMgr) loadCheckpoint(string) (map[string]uint64, error) { return nil, nil } - -func (m *mockSubMgr) storeCheckpoint(string, map[string]uint64) error { return nil } - -func testSubInfo(name string) *eventsapi.SubscriptionInfo { - return &eventsapi.SubscriptionInfo{ID: "test", Stream: "streamID", Name: name} -} - -func newTestStream(submgr subscriptionManager) *eventStream { - a, _ := newEventStream(submgr, &StreamInfo{ - ID: "123", - Type: "WebHook", - Webhook: &webhookActionInfo{ - URL: "http://hello.example.com/world", - }, - }, nil) - return a -} - func TestCreateWebhookSub(t *testing.T) { assert := assert.New(t) diff --git a/internal/events/test_helper.go b/internal/events/test_helper.go new file mode 100644 index 0000000..291e590 --- /dev/null +++ b/internal/events/test_helper.go @@ -0,0 +1,236 @@ +// Copyright 2019 Kaleido + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" + eventmocks "github.com/hyperledger/fabric-sdk-go/pkg/fab/events/service/mocks" + + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/peer" + "github.com/hyperledger/firefly-fabconnect/internal/conf" + eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/kvstore" + mockfabric "github.com/hyperledger/firefly-fabconnect/mocks/fabric/client" + mockkvstore "github.com/hyperledger/firefly-fabconnect/mocks/kvstore" + "github.com/stretchr/testify/mock" +) + +func tempdir(t *testing.T) string { + dir, _ := ioutil.TempDir("", "fly") + t.Logf("tmpdir/create: %s", dir) + return dir +} + +func cleanup(t *testing.T, dir string) { + t.Logf("tmpdir/cleanup: %s [dir]", dir) + os.RemoveAll(dir) +} + +type mockWebSocket struct { + capturedNamespace string + sender chan interface{} + broadcast chan interface{} + receiver chan error + closing chan struct{} +} + +func (m *mockWebSocket) GetChannels(namespace string) (chan<- interface{}, chan<- interface{}, <-chan error, <-chan struct{}) { + m.capturedNamespace = namespace + return m.sender, m.broadcast, m.receiver, m.closing +} + +func (m *mockWebSocket) SendReply(message interface{}) {} + +func newMockWebSocket() *mockWebSocket { + return &mockWebSocket{ + sender: make(chan interface{}), + broadcast: make(chan interface{}), + receiver: make(chan error), + closing: make(chan struct{}), + } +} + +type mockSubMgr struct { + stream *eventStream + subscription *subscription + err error + subscriptions []*subscription +} + +func (m *mockSubMgr) getConfig() *conf.EventstreamConf { + return &conf.EventstreamConf{} +} + +func (m *mockSubMgr) streamByID(string) (*eventStream, error) { + return m.stream, m.err +} + +func (m *mockSubMgr) subscriptionByID(string) (*subscription, error) { + return m.subscription, m.err +} + +func (m *mockSubMgr) subscriptionsForStream(string) []*subscription { + return m.subscriptions +} + +func (m *mockSubMgr) loadCheckpoint(string) (map[string]uint64, error) { return nil, nil } + +func (m *mockSubMgr) storeCheckpoint(string, map[string]uint64) error { return nil } + +func testSubInfo(name string) *eventsapi.SubscriptionInfo { + return &eventsapi.SubscriptionInfo{ID: "test", Stream: "streamID", Name: name} +} + +func newTestStream(submgr subscriptionManager) *eventStream { + a, _ := newEventStream(submgr, &StreamInfo{ + ID: "123", + Type: "WebHook", + Webhook: &webhookActionInfo{ + URL: "http://hello.example.com/world", + }, + }, nil) + return a +} + +func newTestSubscriptionManager() *subscriptionMGR { + smconf := &conf.EventstreamConf{} + rpc := mockRPCClient("") + sm := NewSubscriptionManager(smconf, rpc, newMockWebSocket()).(*subscriptionMGR) + sm.db = &mockkvstore.KVStore{} + sm.config.WebhooksAllowPrivateIPs = true + sm.config.PollingIntervalSec = 0 + return sm +} + +func newTestStreamForBatching(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *httptest.Server, chan []*eventsapi.EventEntry) { + mux := http.NewServeMux() + eventStream := make(chan []*eventsapi.EventEntry) + count := 0 + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + var events []*eventsapi.EventEntry + _ = json.NewDecoder(req.Body).Decode(&events) + eventStream <- events + idx := count + if idx >= len(status) { + idx = len(status) - 1 + } + res.WriteHeader(status[idx]) + count++ + }) + svr := httptest.NewServer(mux) + if spec.Type == "" { + spec.Type = "webhook" + spec.Webhook.URL = svr.URL + spec.Webhook.Headers = map[string]string{"x-my-header": "my-value"} + } + sm := newTestSubscriptionManager() + sm.config.WebhooksAllowPrivateIPs = true + sm.config.PollingIntervalSec = 0 + if db != nil { + sm.db = db + } + mockstore, ok := sm.db.(*mockkvstore.KVStore) + if ok { + mockstore.On("Get", mock.Anything).Return([]byte(""), nil) + mockstore.On("Put", mock.Anything, mock.Anything).Return(nil) + } + + _ = sm.addStream(spec) + return sm, sm.streams[spec.ID], svr, eventStream +} + +func newTestStreamForWebSocket(spec *StreamInfo, db kvstore.KVStore, status ...int) (*subscriptionMGR, *eventStream, *mockWebSocket) { + sm := newTestSubscriptionManager() + sm.config.PollingIntervalSec = 0 + if db != nil { + sm.db = db + } + _ = sm.addStream(spec) + return sm, sm.streams[spec.ID], sm.wsChannels.(*mockWebSocket) +} + +func testEvent(subID string) *eventData { + entry := &eventsapi.EventEntry{ + SubID: subID, + } + return &eventData{ + event: entry, + batchComplete: func(*eventsapi.EventEntry) {}, + } +} + +func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { + rpc := &mockfabric.RPCClient{} + blockEventChan := make(chan *fab.BlockEvent) + ccEventChan := make(chan *fab.CCEvent) + var roBlockEventChan <-chan *fab.BlockEvent = blockEventChan + var roCCEventChan <-chan *fab.CCEvent = ccEventChan + res := &fab.BlockchainInfoResponse{ + BCI: &common.BlockchainInfo{ + Height: 10, + }, + } + rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil) + rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil) + rpc.On("Unregister", mock.Anything).Return() + + go func() { + if fromBlock == "0" { + blockEventChan <- &fab.BlockEvent{ + Block: constructBlock(1), + } + } + blockEventChan <- &fab.BlockEvent{ + Block: constructBlock(11), + } + if len(withReset) > 0 { + blockEventChan <- &fab.BlockEvent{ + Block: constructBlock(11), + } + } + }() + + return rpc +} + +func setupTestSubscription(sm *subscriptionMGR, stream *eventStream, subscriptionName, fromBlock string, withReset ...bool) *eventsapi.SubscriptionInfo { + rpc := mockRPCClient(fromBlock, withReset...) + sm.rpc = rpc + spec := &eventsapi.SubscriptionInfo{ + Name: subscriptionName, + Stream: stream.spec.ID, + } + if fromBlock != "" { + spec.FromBlock = fromBlock + } + _ = sm.addSubscription(spec) + + return spec +} + +func constructBlock(number uint64) *common.Block { + mockTx := eventmocks.NewTransactionWithCCEvent("testTxID", peer.TxValidationCode_VALID, "testChaincodeID", "testCCEventName", []byte("testPayload")) + mockBlock := eventmocks.NewBlock("testChannelID", mockTx) + mockBlock.Header.Number = number + return mockBlock +} diff --git a/internal/fabric/client/ledger.go b/internal/fabric/client/ledger.go index 5e657ac..1b8c29f 100644 --- a/internal/fabric/client/ledger.go +++ b/internal/fabric/client/ledger.go @@ -70,12 +70,16 @@ func (l *ledgerClientWrapper) queryTransaction(channelId, signer, txId string) ( if err != nil { return nil, err } - tx, err := utils.DecodeBlockDataEnvelope(result.TransactionEnvelope) + bloc := &utils.RawBlock{} + envelope, tx, err := bloc.DecodeBlockDataEnvelope(result.TransactionEnvelope) if err != nil { return nil, err } - return tx, nil + ret := make(map[string]interface{}) + ret["tx"] = tx + ret["raw"] = envelope + return ret, nil } func (l *ledgerClientWrapper) getLedgerClient(channelId, signer string) (ledgerClient *ledger.Client, err error) { diff --git a/internal/fabric/utils/block.go b/internal/fabric/utils/block.go new file mode 100644 index 0000000..df21fa8 --- /dev/null +++ b/internal/fabric/utils/block.go @@ -0,0 +1,32 @@ +// Copyright 2021 Kaleido +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +type Block struct { + Number uint64 + Timestamp int64 // unix nano + Transactions []*Transaction +} + +type Transaction struct { + Type string + Status string + Proposals []*Proposal +} + +type Proposal struct { +} diff --git a/internal/fabric/utils/blockdecoder.go b/internal/fabric/utils/blockdecoder.go index 320c295..b56b7fe 100644 --- a/internal/fabric/utils/blockdecoder.go +++ b/internal/fabric/utils/blockdecoder.go @@ -17,261 +17,310 @@ package utils import ( - "encoding/hex" + "strconv" "github.com/golang/protobuf/proto" //nolint "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/msp" "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/firefly-fabconnect/internal/events/api" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) func GetEvents(block *common.Block) []*api.EventEntry { events := []*api.EventEntry{} - fb := toFilteredBlock(block) - for idx, tx := range fb.GetFilteredTransactions() { - actions := tx.GetTransactionActions() - if actions != nil { - ccActions := actions.GetChaincodeActions() - if len(ccActions) > 0 { - for _, ccAction := range ccActions { - event := ccAction.GetChaincodeEvent() - eventEntry := api.EventEntry{ - ChaincodeId: event.ChaincodeId, - BlockNumber: fb.Number, - TransactionId: tx.Txid, - TransactionIndex: idx, - EventName: event.EventName, - Payload: event.Payload, - } - events = append(events, &eventEntry) - } + rawBlock, _, err := DecodeBlock(block) + if err != nil { + return events + } + for idx, entry := range rawBlock.Data.Data { + actions := entry.Payload.Data.Actions + for _, action := range actions { + event := action.Payload.Action.ProposalResponsePayload.Extension.Events + if event == nil { + continue + } + eventEntry := api.EventEntry{ + ChaincodeId: event.ChaincodeId, + BlockNumber: rawBlock.Header.Number, + TransactionId: event.TxId, + TransactionIndex: idx, + EventName: event.EventName, + Payload: event.Payload, + Timestamp: rawBlock.timestamp, } + events = append(events, &eventEntry) } } return events } -func DecodeBlock(block *common.Block) ([]map[string]interface{}, error) { +func DecodeBlock(block *common.Block) (*RawBlock, *Block, error) { + rawblock := &RawBlock{} + rawblock.Header = block.Header + rawblock.Metadata = block.Metadata + blockdata := &BlockData{} + rawblock.Data = blockdata + dataEnvs := make([]*BlockDataEnvelope, len(block.Data.Data)) + blockdata.Data = dataEnvs + + bloc := &Block{} + transactions := make([]*Transaction, len(block.Data.Data)) + bloc.Transactions = transactions + // this array in the block header's metadata contains each transaction's status code txFilter := []byte(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) - - result := make([]map[string]interface{}, len(block.Data.Data)) for i, data := range block.Data.Data { env, err := getEnvelopeFromBlock(data) if err != nil { - return nil, errors.Wrap(err, "error decoding Envelope from block") + return nil, nil, errors.Wrap(err, "error decoding Envelope from block") } if env == nil { - return nil, errors.New("nil envelope") + return nil, nil, errors.New("nil envelope") } - tx, err := DecodeBlockDataEnvelope(env) + data, tx, err := rawblock.DecodeBlockDataEnvelope(env) if err != nil { - return nil, errors.Wrap(err, "error decoding block data envelope") + return nil, nil, errors.Wrap(err, "error decoding block data envelope") } - entry := make(map[string]interface{}) - entry["validationCode"] = peer.TxValidationCode(txFilter[i]) - entry["transaction"] = tx + dataEnvs[i] = data + transactions[i] = tx + tx.Status = peer.TxValidationCode(txFilter[i]).String() } - return result, nil + bloc.Number = rawblock.Header.Number + bloc.Timestamp = rawblock.timestamp + + return rawblock, bloc, nil } -func toFilteredBlock(block *common.Block) *peer.FilteredBlock { - var channelID string - var filteredTxs []*peer.FilteredTransaction - // this array in the block header's metadata contains each transaction's status code - txFilter := []byte(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER]) +func (block *RawBlock) DecodeBlockDataEnvelope(env *common.Envelope) (*BlockDataEnvelope, *Transaction, error) { + // used for the raw block + dataEnv := &BlockDataEnvelope{} + dataEnv.Signature = string(env.Signature) - for i, data := range block.Data.Data { - filteredTx, chID, err := GetFilteredTxFromBlockData(data, peer.TxValidationCode(txFilter[i])) - if err != nil { - log.Warnf("error decoding Envelope from block: %s", err) - continue - } - channelID = chID - filteredTxs = append(filteredTxs, filteredTx) - } + // used in the user-friendly block + transaction := &Transaction{} - return &peer.FilteredBlock{ - ChannelId: channelID, - Number: block.Header.Number, - FilteredTransactions: filteredTxs, - } -} + _payload := &Payload{} + dataEnv.Payload = _payload -func GetFilteredTxFromBlockData(data []byte, txValidationCode peer.TxValidationCode) (*peer.FilteredTransaction, string, error) { - env, err := getEnvelopeFromBlock(data) + payload, err := UnmarshalPayload(env.Payload) if err != nil { - return nil, "", errors.Wrap(err, "error decoding Envelope from block") + return nil, nil, errors.Wrap(err, "error decoding Payload from envelope") } - if env == nil { - return nil, "", errors.New("nil envelope") + + err = block.decodePayload(payload, _payload, transaction) + if err != nil { + return nil, nil, err } - return GetFilteredTxFromEnvelope(env, txValidationCode) + return dataEnv, transaction, nil } -func DecodeBlockDataEnvelope(env *common.Envelope) (map[string]interface{}, error) { - payload, err := UnmarshalPayload(env.Payload) - if err != nil { - return nil, errors.Wrap(err, "error decoding Payload from envelope") +func (block *RawBlock) decodePayload(payload *common.Payload, _payload *Payload, _transaction *Transaction) error { + _payloadData := &PayloadData{} + _payload.Data = _payloadData + _payloadHeader := &PayloadHeader{} + _payload.Header = _payloadHeader + + if err := block.decodePayloadHeader(payload.Header, _payloadHeader); err != nil { + return err } - result := make(map[string]interface{}) + if err := block.decodePayloadData(payload.Data, _payloadData, _transaction); err != nil { + return err + } + + return nil +} +func (block *RawBlock) decodePayloadHeader(header *common.Header, _header *PayloadHeader) error { + _channelHeader := &ChannelHeader{} channelHeader := &common.ChannelHeader{} - if err := proto.Unmarshal(payload.Header.ChannelHeader, channelHeader); err != nil { - return nil, errors.Wrap(err, "error decoding ChannelHeader from payload") + if err := proto.Unmarshal(header.ChannelHeader, channelHeader); err != nil { + return errors.Wrap(err, "error decoding ChannelHeader from payload") } + _channelHeader.ChannelId = channelHeader.ChannelId + _channelHeader.Epoch = strconv.FormatUint(channelHeader.Epoch, 10) + _channelHeader.Timestamp = channelHeader.Timestamp.AsTime().UnixNano() + _channelHeader.TxId = channelHeader.TxId + _channelHeader.Type = common.HeaderType_name[channelHeader.Type] + _channelHeader.Version = int(channelHeader.Version) + _header.ChannelHeader = _channelHeader + block.timestamp = _channelHeader.Timestamp + block.channelHeader = _channelHeader + + _signatureHeader := &SignatureHeader{} + _header.SignatureHeader = _signatureHeader + + return nil +} - // TODO: support other block types (1=ConfigEvelope, 2=ConfigUpdateEnvelope) - if channelHeader.Type == 3 { - result["type"] = "EndorserTransaction" +func (block *RawBlock) decodeSignatureHeader(_signatureHeader *SignatureHeader, bytes []byte) error { + signatureHeader := &common.SignatureHeader{} + if err := proto.Unmarshal(bytes, signatureHeader); err != nil { + return errors.Wrap(err, "error decoding SignatureHeader from payload") + } + _signatureHeader.Nonce = string(signatureHeader.Nonce) + creator := &msp.SerializedIdentity{} + if err := proto.Unmarshal(signatureHeader.Creator, creator); err != nil { + return errors.Wrap(err, "error decoding Creator from signature header") + } + _signatureHeader.Creator = creator + return nil +} + +func (block *RawBlock) decodePayloadData(payloadData []byte, _payloadData *PayloadData, _transaction *Transaction) error { + _transaction.Type = block.channelHeader.Type + // TODO: support other block types (1=ConfigEvelope, 2=ConfigUpdateEnvelope) + if block.channelHeader.Type == common.HeaderType_name[3] { tx := &peer.Transaction{} - if err := proto.Unmarshal(payload.Data, tx); err != nil { - return nil, errors.Wrap(err, "error decoding transaction payload data") + if err := proto.Unmarshal(payloadData, tx); err != nil { + return errors.Wrap(err, "error decoding transaction payload data") } - transactions := make([]map[string]interface{}, len(tx.Actions)) - result["transactions"] = transactions + // each transaction may pack multiple proposals, for instance a DvP transaction that + // includes both a proposal for the payment and a proposal for the delivery. + _actions := make([]*Action, len(tx.Actions)) + _payloadData.Actions = _actions + + proposals := make([]*Proposal, len(tx.Actions)) + _transaction.Proposals = proposals for i, action := range tx.Actions { - transaction := make(map[string]interface{}) - transactions[i] = transaction - - chaincodeActionPayload := make(map[string]interface{}) - transaction["payload"] = chaincodeActionPayload - cap := &peer.ChaincodeActionPayload{} - if err := proto.Unmarshal(action.Payload, cap); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode action payload") - } - // proposal payload part of the chaincode action - chaincodeProposalPayload := make(map[string]interface{}) - chaincodeActionPayload["chaincodeProposalPayload"] = chaincodeProposalPayload - cpp := &peer.ChaincodeProposalPayload{} - if err = proto.Unmarshal(cap.ChaincodeProposalPayload, cpp); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode proposal payload") + _action := &Action{} + _signatureHeader := &SignatureHeader{} + _action.Header = _signatureHeader + if err := block.decodeSignatureHeader(_signatureHeader, action.Header); err != nil { + return err } - chaincodeInvocationSpec := make(map[string]interface{}) - chaincodeProposalPayload["input"] = chaincodeInvocationSpec - ccSpec := &peer.ChaincodeInvocationSpec{} - if err := proto.Unmarshal(cpp.Input, ccSpec); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode invocation spec") - } - - chaincodeInvocationSpec["type"] = ccSpec.ChaincodeSpec.Type - chaincodeInvocationSpec["chaincodeId"] = ccSpec.ChaincodeSpec.ChaincodeId + _actions[i] = _action - chaincodeInput := make(map[string]interface{}) - chaincodeInputArgs := make([]string, len(ccSpec.ChaincodeSpec.Input.Args)) - for j, arg := range ccSpec.ChaincodeSpec.Input.Args { - chaincodeInputArgs[j] = string(arg) - } - chaincodeInput["args"] = chaincodeInputArgs - chaincodeInput["isInit"] = ccSpec.ChaincodeSpec.Input.IsInit - chaincodeInvocationSpec["input"] = chaincodeInput - - chaincodeProposal := make(map[string]interface{}) - chaincodeProposal["input"] = chaincodeInput - transaction["chaincodeProposal"] = chaincodeProposal - - // endorsed action part of the chaincode action - chaincodeEndorsedAction := make(map[string]interface{}) - chaincodeActionPayload["action"] = chaincodeEndorsedAction - - proposalResponsePayload := make(map[string]interface{}) - chaincodeEndorsedAction["proposalResponsePayload"] = proposalResponsePayload - prp := &peer.ProposalResponsePayload{} - if err = proto.Unmarshal(cap.Action.ProposalResponsePayload, prp); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode proposal response payload") - } + proposal := &Proposal{} + proposals[i] = proposal - proposalResponsePayload["proposalHash"] = hex.EncodeToString(prp.ProposalHash) + _actionPayload := &ActionPayload{} + _action.Payload = _actionPayload - extension := make(map[string]interface{}) - proposalResponsePayload["extension"] = extension - cca := &peer.ChaincodeAction{} - if err = proto.Unmarshal(prp.Extension, cca); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode action") + if err := block.decodeActionPayload(_actionPayload, action.Payload); err != nil { + return err } - // skipping read/write sets in the action - // decode events - chaincodeEvent := make(map[string]interface{}) - extension["events"] = chaincodeEvent - ccevt := &peer.ChaincodeEvent{} - if err = proto.Unmarshal(cca.Events, ccevt); err != nil { - return nil, errors.Wrap(err, "error decoding chaincode event") - } - chaincodeEvent["chaincodeId"] = ccevt.ChaincodeId - chaincodeEvent["txId"] = ccevt.TxId - chaincodeEvent["eventName"] = ccevt.EventName - chaincodeEvent["payload"] = string(ccevt.Payload) } } - return result, nil + return nil } -func GetFilteredTxFromEnvelope(env *common.Envelope, txValidationCode peer.TxValidationCode) (*peer.FilteredTransaction, string, error) { - payload, err := UnmarshalPayload(env.Payload) - if err != nil { - return nil, "", errors.Wrap(err, "error decoding Payload from envelope") - } +func (block *RawBlock) decodeActionPayload(_actionPayload *ActionPayload, payload []byte) error { + _actionPayloadAction := &ActionPayloadAction{} + _actionPayload.Action = _actionPayloadAction - channelHeaderBytes := payload.Header.ChannelHeader - channelHeader := &common.ChannelHeader{} - if err := proto.Unmarshal(channelHeaderBytes, channelHeader); err != nil { - return nil, "", errors.Wrap(err, "error decoding ChannelHeader from payload") + _chaincodeProposalPayload := &ChaincodeProposalPayload{} + _actionPayload.ChaincodeProposalPayload = _chaincodeProposalPayload + + cap := &peer.ChaincodeActionPayload{} + if err := proto.Unmarshal(payload, cap); err != nil { + return errors.Wrap(err, "error decoding chaincode action payload") } - filteredTx := &peer.FilteredTransaction{ - Type: common.HeaderType(channelHeader.Type), - Txid: channelHeader.TxId, - TxValidationCode: txValidationCode, + if err := block.decodeActionPayloadAction(_actionPayloadAction, cap.Action); err != nil { + return err } - if common.HeaderType(channelHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION { - actions, err := getFilteredTransactionActions(payload.Data) - if err != nil { - return nil, "", errors.Wrap(err, "error getting filtered transaction actions") - } - filteredTx.Data = actions + if err := block.decodeActionPayloadChaincodeProposalPayload(_chaincodeProposalPayload, cap.ChaincodeProposalPayload); err != nil { + return err } - return filteredTx, channelHeader.ChannelId, nil + + return nil } -func getFilteredTransactionActions(data []byte) (*peer.FilteredTransaction_TransactionActions, error) { - actions := &peer.FilteredTransaction_TransactionActions{ - TransactionActions: &peer.FilteredTransactionActions{}, +func (block *RawBlock) decodeActionPayloadChaincodeProposalPayload(chaincodeProposalPayload *ChaincodeProposalPayload, bytes []byte) error { + cpp := &peer.ChaincodeProposalPayload{} + if err := proto.Unmarshal(bytes, cpp); err != nil { + return errors.Wrap(err, "error decoding chaincode proposal payload") } - tx, err := UnmarshalTransaction(data) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling transaction payload") + + chaincodeProposalPayload.TransientMap = &cpp.TransientMap + + proposalPayloadInput := &ProposalPayloadInput{} + chaincodeProposalPayload.Input = proposalPayloadInput + + ccSpec := &peer.ChaincodeInvocationSpec{} + if err := proto.Unmarshal(cpp.Input, ccSpec); err != nil { + return errors.Wrap(err, "error decoding chaincode invocation spec") } - chaincodeActionPayload, err := UnmarshalChaincodeActionPayload(tx.Actions[0].Payload) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling chaincode action payload") + + if ccSpec.ChaincodeSpec != nil { + chaincodeSpec := &ChaincodeSpec{} + chaincodeSpec.Type = ccSpec.ChaincodeSpec.Type.String() + proposalPayloadInput.ChaincodeSpec = chaincodeSpec + + chaincodeSpec.ChaincodeId = ccSpec.ChaincodeSpec.ChaincodeId + + chaincodeInput := &ChaincodeSpecInput{} + chaincodeSpec.Input = chaincodeInput + + chaincodeInputArgs := make([]string, len(ccSpec.ChaincodeSpec.Input.Args)) + for j, arg := range ccSpec.ChaincodeSpec.Input.Args { + chaincodeInputArgs[j] = string(arg) + } + chaincodeInput.Args = chaincodeInputArgs + chaincodeInput.IsInit = ccSpec.ChaincodeSpec.Input.IsInit } - propRespPayload, err := UnmarshalProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling response payload") + + return nil +} + +func (block *RawBlock) decodeActionPayloadAction(_actionPayloadAction *ActionPayloadAction, action *peer.ChaincodeEndorsedAction) error { + _proposalResponsePayload := &ProposalResponsePayload{} + _actionPayloadAction.ProposalResponsePayload = _proposalResponsePayload + if err := block.decodeProposalResponsePayload(_proposalResponsePayload, action.ProposalResponsePayload); err != nil { + return err } - ccAction, err := UnmarshalChaincodeAction(propRespPayload.Extension) - if err != nil { - return nil, errors.Wrap(err, "error unmarshalling chaincode action") + return nil +} + +func (block *RawBlock) decodeProposalResponsePayload(_proposalResponsePayload *ProposalResponsePayload, proposalResponsePayload []byte) error { + prp := &peer.ProposalResponsePayload{} + if err := proto.Unmarshal(proposalResponsePayload, prp); err != nil { + return errors.Wrap(err, "error decoding chaincode proposal response payload") } - ccEvent, err := UnmarshalChaincodeEvents(ccAction.Events) - if err != nil { - return nil, errors.Wrap(err, "error getting chaincode events") + + _proposalResponsePayload.ProposalHash = string(prp.ProposalHash) + + _extension := &Extension{} + _proposalResponsePayload.Extension = _extension + if err := block.decodeProposalResponsePayloadExtension(_extension, prp.Extension); err != nil { + return err } - if ccEvent != nil { - actions.TransactionActions.ChaincodeActions = append(actions.TransactionActions.ChaincodeActions, &peer.FilteredChaincodeAction{ChaincodeEvent: ccEvent}) + return nil +} + +func (block *RawBlock) decodeProposalResponsePayloadExtension(_extension *Extension, extension []byte) error { + cca := &peer.ChaincodeAction{} + if err := proto.Unmarshal(extension, cca); err != nil { + return errors.Wrap(err, "error decoding chaincode action") + } + + _extension.ChaincodeId = cca.ChaincodeId + + // skipping read/write sets in the action + // decode events + ccevt := &peer.ChaincodeEvent{} + if err := proto.Unmarshal(cca.Events, ccevt); err != nil { + return errors.Wrap(err, "error decoding chaincode event") } - return actions, nil + _chaincodeEvent := &ChaincodeEvent{} + _extension.Events = _chaincodeEvent + _chaincodeEvent.ChaincodeId = ccevt.ChaincodeId + _chaincodeEvent.TxId = ccevt.TxId + _chaincodeEvent.Timestamp = strconv.FormatInt(block.timestamp, 10) + _chaincodeEvent.EventName = ccevt.EventName + _chaincodeEvent.Payload = ccevt.Payload + + return nil } func getEnvelopeFromBlock(data []byte) (*common.Envelope, error) { diff --git a/internal/fabric/utils/blockdecoder_test.go b/internal/fabric/utils/blockdecoder_test.go new file mode 100644 index 0000000..7018a59 --- /dev/null +++ b/internal/fabric/utils/blockdecoder_test.go @@ -0,0 +1,100 @@ +// Copyright 2021 Kaleido +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "os" + "testing" + + "github.com/golang/protobuf/proto" //nolint + "github.com/hyperledger/fabric-protos-go/common" + "github.com/stretchr/testify/assert" +) + +func TestDecodeEndorserBlockWithEvents(t *testing.T) { + assert := assert.New(t) + content, _ := os.ReadFile("../../../test/resources/block-2.bin") + testblock := &common.Block{} + _ = proto.Unmarshal(content, testblock) + decoded, _, err := DecodeBlock(testblock) + assert.NoError(err) + assert.Equal(1, len(decoded.Data.Data)) + assert.Equal(byte(0), decoded.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER][0]) + + tx := decoded.Data.Data[0] + actions := tx.Payload.Data.Actions + assert.Equal(1, len(actions)) + action := actions[0] + assert.Equal("u0o4mkkzs6", action.Header.Creator.Mspid) + + apa := action.Payload.Action + assert.Equal("asset_transfer", apa.ProposalResponsePayload.Extension.ChaincodeId.Name) + assert.Equal("1.1.0.u0ypz4p14q", apa.ProposalResponsePayload.Extension.ChaincodeId.Version) + + event := apa.ProposalResponsePayload.Extension.Events + assert.Equal("asset_transfer", event.ChaincodeId) + assert.Regexp("[0-9a-f]{64}", event.TxId) + assert.Regexp("[0-9]+", event.Timestamp) + assert.Equal("AssetCreated", event.EventName) + assert.Equal("{\"ID\":\"asset05\",\"color\":\"red\",\"size\":10,\"owner\":\"Tom\",\"appraisedValue\":123000}", string(event.Payload)) + + cpp := action.Payload.ChaincodeProposalPayload + assert.Equal("asset_transfer", cpp.Input.ChaincodeSpec.ChaincodeId.Name) + assert.Equal("CreateAsset", cpp.Input.ChaincodeSpec.Input.Args[0]) +} + +func TestDecodeEndorserBlockLifecycleTxs(t *testing.T) { + assert := assert.New(t) + content, _ := os.ReadFile("../../../test/resources/block-1.bin") + testblock := &common.Block{} + _ = proto.Unmarshal(content, testblock) + decoded, _, err := DecodeBlock(testblock) + assert.NoError(err) + assert.Equal(1, len(decoded.Data.Data)) + assert.Equal(byte(0), decoded.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER][0]) + + tx := decoded.Data.Data[0] + actions := tx.Payload.Data.Actions + assert.Equal(1, len(actions)) + action := actions[0] + assert.Equal("u0o4mkkzs6", action.Header.Creator.Mspid) + + apa := action.Payload.Action + assert.Equal("_lifecycle", apa.ProposalResponsePayload.Extension.ChaincodeId.Name) + assert.Equal("syscc", apa.ProposalResponsePayload.Extension.ChaincodeId.Version) + + cpp := action.Payload.ChaincodeProposalPayload + assert.Equal("_lifecycle", cpp.Input.ChaincodeSpec.ChaincodeId.Name) + assert.Equal("UNDEFINED", cpp.Input.ChaincodeSpec.Type) + assert.Equal("ApproveChaincodeDefinitionForMyOrg", cpp.Input.ChaincodeSpec.Input.Args[0]) +} + +func TestGetEvents(t *testing.T) { + assert := assert.New(t) + content, _ := os.ReadFile("../../../test/resources/block-2.bin") + testblock := &common.Block{} + _ = proto.Unmarshal(content, testblock) + events := GetEvents(testblock) + assert.Equal(1, len(events)) + entry := events[0] + assert.Equal("asset_transfer", entry.ChaincodeId) + assert.Equal(uint64(16), entry.BlockNumber) + assert.Equal("AssetCreated", entry.EventName) + assert.Regexp("[0-9a-f]{64}", entry.TransactionId) + assert.Equal(0, entry.TransactionIndex) + assert.Equal(int64(1641861241312746000), entry.Timestamp) +} diff --git a/internal/fabric/utils/rawblock.go b/internal/fabric/utils/rawblock.go new file mode 100644 index 0000000..15c24b0 --- /dev/null +++ b/internal/fabric/utils/rawblock.go @@ -0,0 +1,124 @@ +// Copyright 2021 Kaleido +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "github.com/hyperledger/fabric-protos-go/common" + "github.com/hyperledger/fabric-protos-go/msp" + "github.com/hyperledger/fabric-protos-go/peer" +) + +type RawBlock struct { + Data *BlockData `json:"data"` + Header *common.BlockHeader `json:"header"` + Metadata *common.BlockMetadata `json:"metadata"` + + channelHeader *ChannelHeader + timestamp int64 +} + +type BlockData struct { + Data []*BlockDataEnvelope `json:"data"` +} + +type BlockDataEnvelope struct { + Payload *Payload `json:"payload"` + Signature string `json:"signature"` +} + +type Payload struct { + Data *PayloadData `json:"data"` + Header *PayloadHeader `json:"header"` +} + +type PayloadData struct { + Actions []*Action `json:"actions"` +} + +type Action struct { + Header *SignatureHeader `json:"header"` + Payload *ActionPayload `json:"payload"` +} + +type ActionPayload struct { + Action *ActionPayloadAction `json:"action"` + ChaincodeProposalPayload *ChaincodeProposalPayload `json:"chaincode_proposal_payload"` +} + +type ActionPayloadAction struct { + // Endorsements are not parsed for now + ProposalResponsePayload *ProposalResponsePayload `json:"proposal_response_payload"` +} + +type ProposalResponsePayload struct { + Extension *Extension `json:"extension"` + ProposalHash string `json:"proposal_hash"` +} + +type Extension struct { + ChaincodeId *peer.ChaincodeID `json:"chaincode_id"` + Events *ChaincodeEvent `json:"events"` + // Response + // Result +} + +type ChaincodeEvent struct { + ChaincodeId string + TxId string + Timestamp string + EventName string + Payload []byte +} + +type ChaincodeProposalPayload struct { + TransientMap *map[string][]byte `json:"TransientMap"` + Input *ProposalPayloadInput `json:"input"` +} + +type ProposalPayloadInput struct { + ChaincodeSpec *ChaincodeSpec `json:"chaincode_spec"` +} + +type ChaincodeSpec struct { + ChaincodeId *peer.ChaincodeID `json:"chaincode_id"` + Input *ChaincodeSpecInput `json:"input"` + Type string `json:"type"` +} + +type ChaincodeSpecInput struct { + Args []string `json:"args"` + IsInit bool `json:"is_init"` +} + +type PayloadHeader struct { + ChannelHeader *ChannelHeader `json:"channel_header"` + SignatureHeader *SignatureHeader `json:"signature_header"` +} + +type ChannelHeader struct { + ChannelId string `json:"channel_id"` + Epoch string `json:"epoch"` + Timestamp int64 `json:"timestamp"` + TxId string `json:"tx_id"` + Type string `json:"type"` + Version int `json:"version"` +} + +type SignatureHeader struct { + Creator *msp.SerializedIdentity `json:"creator"` + Nonce string `json:"nonce"` +} diff --git a/test/resources/block-1.bin b/test/resources/block-1.bin new file mode 100644 index 0000000000000000000000000000000000000000..8e49be807a5fbff1272ed079d316c090afa58cc0 GIT binary patch literal 4947 zcmeI0d5{xj7Qmf>8HNuig6Yd86ll?cMe6c&vfU$uXV*vcc;@y=bogKxVkDL z$O0b7imZZ&vM!WUfUGVm)GneT2#N^v-7z*_kXd={}gh&-%}V_y&-s6rf0(SE7xsbXzBT8Z76u5a5SbGk5e-J2j4Cp$w4Ku!8fhH6gLfXRxS5cwLfh?;-3gKg~ql%X7w zNvG>2qvo0Ram^o2Sy`8spnL(AW(h1n9f|@c%MzXxL11E7!c-Q=BsNIb6Dr!5Fq*@1 z0*U5GPVnX!IgrOJL6wlAtUVWF>nVt8dRaXFmoFvXQVs&sh$ToshV-e?^v|1(Vm<^X)AsmYaFeMal z`Y@H0qGZVHWhhnfN5fTx3k$hs92{A&Hx?X+7km|`7g0&<&3AL0r!7;{L<-K)0Mns&r`OwX*>Ww;a4x?Qee@J$ydY=cc76b#ZNPVar z$|GT8w(PXBjWg#+91B`;Rl;#PW@0lmBrvu*o60w?lNm>v&crfGi3`&Z7q*lEo65}zb7>QGujN23o5wZJSHIGTAYD$!@_`(qIQ;aT=k^+Vbl*(I;k~@($_*7PM z@YX`GRA$Lg$Xl(L88{Hu!9i1w%b8q0i#b>8W6XIXU;>G{^D=M<3(7@)7ZpwChqX5j z@cXyV|izfE&X8#wmm?+3l$qAZsYOL>kLOS*(CDndzj z{HAq&u}~JXya+~vo@0_TVjm}PecTD@UCRc7&dG8nugJXIH#t|7CLbdg9}ECJK>PZ< zcQn%5$eyA5?>Sd`kH1T2|7-SkyKm6itA?C2bANxLnWOsibK~;fFW!4-$h+?^Ufyp* z$E^IaWpr4WzDE4;%Q38m1lldvPJ@?>UUJ~R*hm^O9xFD$j3v z(Kcf2@@FfPSTvix^x*8JsS%wm*7c?h^+;xL_t#z8^axPrq>?+{!%Iw(8Ahl{X)nSPt#JJ@(GvM;E_;>B4pN=*+{ByY10)_guVT#A!>< z?$UNH?)Y^2LRaRBRaIuzjNxx>{p#77&)oXJuG>E8Tkk19>T6ax*%Ro}-ZjFoqjd1i zGxn|-%PrrzyLW|d0%PC#$WQv)vUq4?jEI#{P*X-_Xmh|RD z`znlh$KruA3e)yzy1+oC#Pb>r7zR#cYCy+^mYy??)zz-HQmvUiYY_PR@z%QXq}J*r z8`dg_-&(6X7e=H!$rn6Te^IX*LrmRm*Ht;M;{92yHTY9lE2I+YQET<7v2dv7^f^_k zsd)_rqSURVl8aPjI8?BfbXnAIr6+iYMQQ3tB0{cH?^vrd(1_P__Fa9mR992PXhY+<%ShA1;*&BJj60cULC zY%&fCBBF-X#+fB2g?pn6fl*WdF3Hh|58|^<*_|yN5>jtCmf^zHd?TKZ8<2))va*A) zhE>3G&wiCe3Eu`mkxgWjfue%s|{laE?$h*8v-pEa^sT!}?kIT3+WaKb!9`czEy zVXS0x=lI@eDH!0Q1mP29!kENrRICJ<=-#})G5HiY7I9cDz~4s^ENy~%xjL22P(9vjiV<t#8~fJd%6$s)J~0@&)D3Z@O>J>Tl$Uf4;d@J8t{K^X5IR zyKc&tUzIPM|A)uX)t~AwuikiPp#AYbeR+yuK-d03;F&>SIB4w{s2KtVg8?nA1BMOk z1ez6tz(3o;v&S?4*h!gpH78ZU@GbLV)Kk>c#kx%(iWVzvw$p)PB~(czg`a0qm3{)| z|2x_?CRL%rXM75u`Cr@z^}v~J{(o)RAH%)RSuvT6DYrYE%-F;FuuVyXK-}Xn>)Z@2 zk#I4csMz@;nvAgpeL4wS0uC2$HQ0jnvNi1_JXLEZ4@K>TbXKYL>c|K!<|!RW6x_HP zAn;1+|KlE5P{c!&8Jtd|2E7k92QzFQPD&~lP5&tFK@!xs;FzizybW1Jy;+ ziMq?-F0urRr!82YoDvy1(wDcJNxPp$+;qqm?xj@^X|{RdkSXYP7Kt?TL+p1J&23xE zzTP|R8*iSQU)s57)vKZv?moEcFwD?rCYR&metY-A*$YOd$Ikw2>-?oRys~od)Q@bR z9gy5xTD5I+rc138_I@*^y04$wI&9ZL{p*wc!QZUdz4_e-C#J9e91Lt?3|dYBr!;-; n#Y;Z^-M%@Nw(_=HJqJDroVWEg18LP-CN3X^4ZMF+{__6-Pq4dC literal 0 HcmV?d00001 diff --git a/test/resources/block-1.json b/test/resources/block-1.json new file mode 100644 index 0000000..5f2d578 --- /dev/null +++ b/test/resources/block-1.json @@ -0,0 +1,156 @@ +{ + "data": { + "data": [ + { + "payload": { + "data": { + "actions": [ + { + "header": { + "creator": { + "id_bytes": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNQakNDQWVXZ0F3SUJBZ0lVRnpmdzV0ZjY4V2lmNFlvRGFkUW9IaVRwQTlVd0NnWUlLb1pJemowRUF3SXcKR3pFWk1CY0dBMVVFQXhNUVptRmljbWxqTFdOaExYTmxjblpsY2pBZUZ3MHlNakF4TURReE5UQTBNREJhRncwegpNakF4TURJeE5URXhNREJhTUZ3eEN6QUpCZ05WQkFZVEFsVlRNUkF3RGdZRFZRUUhFd2RTWVd4bGFXZG9NUkF3CkRnWURWUVFLRXdkTFlXeGxhV1J2TVE0d0RBWURWUVFMRXdWaFpHMXBiakVaTUJjR0ExVUVBeE1RZFRCNFl6ZHAKY0RWdlpTMWhaRzFwYmpCWk1CTUdCeXFHU000OUFnRUdDQ3FHU000OUF3RUhBMElBQkVpSGRvQmVXUjF3REJ2VAphbE9jVUk4VjdnckRmTmN3SjV6c0RQZ3o2TFVLbDBQeXEwWDVYd3FGeHpEb2hxekExRW5Qd3k5WkdISDYvQ3c1CmhCRzVQb0dqZ2NVd2djSXdEZ1lEVlIwUEFRSC9CQVFEQWdlQU1Bd0dBMVVkRXdFQi93UUNNQUF3SFFZRFZSME8KQkJZRUZPS1UvaEQzVEpqZndzcDczWkhReGNRekJaQ1hNQjhHQTFVZEl3UVlNQmFBRk5tU1ZpTHcxUlBCeHFkMgo5Z3Q1aVZwNFc2bnVNR0lHQ0NvREJBVUdCd2dCQkZaN0ltRjBkSEp6SWpwN0ltaG1Ma0ZtWm1sc2FXRjBhVzl1Cklqb2lJaXdpYUdZdVJXNXliMnhzYldWdWRFbEVJam9pZFRCNFl6ZHBjRFZ2WlMxaFpHMXBiaUlzSW1obUxsUjUKY0dVaU9pSmhaRzFwYmlKOWZUQUtCZ2dxaGtqT1BRUURBZ05IQURCRUFpQkpsOUNYS1NOeVA3aUV5bFlGdlJXMApTRUgvZmpnVDJ2MGpwRzZnT2ZrMlJ3SWdMY0dxVXN1SURWVlF4djhIM1RXLzNVN29hbzdOUjk4b3l0SDhwQ1Q3CmZ6TT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=", + "mspid": "u0o4mkkzs6" + }, + "nonce": "7nnc/HDYOhtV8bE0h8MvRtDOG67WPHbK" + }, + "payload": { + "action": { + "endorsements": [ + { + "endorser": "Cgp1MG80bWtrenM2ErIGLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNNRENDQWRlZ0F3SUJBZ0lVT29XNGdwRGNxUHhTcjF3NlZIek9LL3hhUWxRd0NnWUlLb1pJemowRUF3SXcKR3pFWk1CY0dBMVVFQXhNUVptRmljbWxqTFdOaExYTmxjblpsY2pBZUZ3MHlNakF4TURReE5UQTBNREJhRncwegpNakF4TURJeE5URXhNREJhTUZVeEN6QUpCZ05WQkFZVEFsVlRNUkF3RGdZRFZRUUhFd2RTWVd4bGFXZG9NUkF3CkRnWURWUVFLRXdkTFlXeGxhV1J2TVEwd0N3WURWUVFMRXdSd1pXVnlNUk13RVFZRFZRUURFd3AxTUhoak4ybHcKTlc5bE1Ga3dFd1lIS29aSXpqMENBUVlJS29aSXpqMERBUWNEUWdBRWpySEJMNHYweEt4cHlqSDY5OGROUVNqdwpVNTYvTm0wRWJGT2NmQkNWZUNsNkpkYmxITWxLVXVBWjNMRGFYeGkzVnFHcy9uNUpsM3JuWmNCUTlFU282YU9CCnZqQ0J1ekFPQmdOVkhROEJBZjhFQkFNQ0I0QXdEQVlEVlIwVEFRSC9CQUl3QURBZEJnTlZIUTRFRmdRVTl2cEUKbFR1U29QTzgySExZTDUxRlNJWjAzbk13SHdZRFZSMGpCQmd3Rm9BVTJaSldJdkRWRThIR3AzYjJDM21KV25oYgpxZTR3V3dZSUtnTUVCUVlIQ0FFRVQzc2lZWFIwY25NaU9uc2lhR1l1UVdabWFXeHBZWFJwYjI0aU9pSWlMQ0pvClppNUZibkp2Ykd4dFpXNTBTVVFpT2lKMU1IaGpOMmx3Tlc5bElpd2lhR1l1Vkhsd1pTSTZJbkJsWlhJaWZYMHcKQ2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnSk9vYytZc1VUYVlDQ1JlaUM2aEF5RVpzMDdGK3BTYUZ3cWFJNExHOQpTdEVDSUY3MCthekYvMGwrekp4NEs0am0wblhKSmU4WlgyNGcweWhRSElLN3VvZzQKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=", + "signature": "MEUCIQD5CIoblyfVrorCcNNs6aBJR/6OSvSYh5u1q8PYmjVLzQIgNymazueIBVHfuhQGzgxedj+9dRH3xhIomvJcFzwhQo8=" + } + ], + "proposal_response_payload": { + "extension": { + "chaincode_id": { + "name": "_lifecycle", + "path": "", + "version": "syscc" + }, + "events": null, + "response": { + "message": "", + "payload": null, + "status": 200 + }, + "results": { + "data_model": "KV", + "ns_rwset": [ + { + "collection_hashed_rwset": [ + { + "collection_name": "_implicit_org_u0o4mkkzs6", + "hashed_rwset": "CiIKINmY4hJn4TTqDvmdH2fnY5kvS7hukktSB75gCx2M+a5mCiIKIHdTcJViae33560L4+ujsK5KNSiy+gEkV2SIvWXu+ihJEkQKIM5+hzOmJKb8oo79YZnTnjb3A4PP4h/gamLf1MjqvRcSGiDbxf6U3mo8F9FIHDUj8FrvQM9LeXPN6j+yhuwzIEvM2xJECiDZmOISZ+E06g75nR9n52OZL0u4bpJLUge+YAsdjPmuZhogaVLt4CLLrpWeG0qg2tR+hbyeHk6djfzwiFB5NIgLvnUSRAogWMngpfTX1i4VU6aNgSi5LYXMxDIzh6E/5TLGauO+vlcaIAjafEXLIEN35+QiSc2lcT+oZRFt27TLWhlJsuW0OKarEkQKIM/L0koUKrDJdj9JNW5uO/6OqmoUGAFHxDfIelhtCRv9GiDE+7B9e+HXAtVebC5od/Z9wlPbrYAziISfMwJIqAW03hJECiDerT51VumSWuIJt6PrO5/CRiRts1icS1kf6izIFA2qHBogGKMI9IifTm1ctHhIjYoR39H7yYzHkKXlkfJ0xFB1t1ISRAogd1NwlWJp7ffnrQvj66Owrko1KLL6ASRXZIi9Ze76KEkaIJkUMttz/t4T7b0qYbDg6SuxLzxIS+C2YMr4sMMe8UcG", + "pvt_rwset_hash": "nNb7DDbY/c96caMc406OG9Y5a82h8HZIZZSjBIpxh+o=" + } + ], + "namespace": "_lifecycle", + "rwset": { + "metadata_writes": [], + "range_queries_info": [], + "reads": [ + { + "key": "namespaces/fields/asset_transfer/Sequence", + "version": null + }, + { + "key": "namespaces/metadata/asset_transfer", + "version": null + } + ], + "writes": [] + } + }, + { + "collection_hashed_rwset": [], + "namespace": "lscc", + "rwset": { + "metadata_writes": [], + "range_queries_info": [], + "reads": [ + { + "key": "asset_transfer", + "version": null + } + ], + "writes": [] + } + } + ] + } + }, + "proposal_hash": "VWoXVGLfk8aMCTYsgSWrewsg0oJr01IxGYpJNsT+oYU=" + } + }, + "chaincode_proposal_payload": { + "TransientMap": {}, + "input": { + "chaincode_spec": { + "chaincode_id": { + "name": "_lifecycle", + "path": "", + "version": "" + }, + "input": { + "args": [ + "QXBwcm92ZUNoYWluY29kZURlZmluaXRpb25Gb3JNeU9yZw==", + "CAESDmFzc2V0X3RyYW5zZmVyGhAxLjEuMC51MHlwejRwMTRxSmQSYgpgYXNzZXRfdHJhbnNmZXItMS4xLjAudTB5cHo0cDE0cTo5NjliYTY3MzRkZTM0N2M4NTBiNTVkM2Y1ODU0ZTMwMDAxYzRjMjE5NWZkYzI1NTgwN2JhMDlhMDE4N2Y4M2E5" + ], + "decorations": {}, + "is_init": false + }, + "timeout": 0, + "type": "UNDEFINED" + } + } + } + } + } + ] + }, + "header": { + "channel_header": { + "channel_id": "default-channel", + "epoch": "0", + "extension": { + "chaincode_id": { + "name": "_lifecycle", + "path": "", + "version": "" + } + }, + "timestamp": "2022-01-04T15:26:09.292003136Z", + "tls_cert_hash": null, + "tx_id": "f73d545a63f200dceb966daa07e89831624fa36786ed75836157d8f2d68fd571", + "type": 3, + "version": 0 + }, + "signature_header": { + "creator": { + "id_bytes": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNQakNDQWVXZ0F3SUJBZ0lVRnpmdzV0ZjY4V2lmNFlvRGFkUW9IaVRwQTlVd0NnWUlLb1pJemowRUF3SXcKR3pFWk1CY0dBMVVFQXhNUVptRmljbWxqTFdOaExYTmxjblpsY2pBZUZ3MHlNakF4TURReE5UQTBNREJhRncwegpNakF4TURJeE5URXhNREJhTUZ3eEN6QUpCZ05WQkFZVEFsVlRNUkF3RGdZRFZRUUhFd2RTWVd4bGFXZG9NUkF3CkRnWURWUVFLRXdkTFlXeGxhV1J2TVE0d0RBWURWUVFMRXdWaFpHMXBiakVaTUJjR0ExVUVBeE1RZFRCNFl6ZHAKY0RWdlpTMWhaRzFwYmpCWk1CTUdCeXFHU000OUFnRUdDQ3FHU000OUF3RUhBMElBQkVpSGRvQmVXUjF3REJ2VAphbE9jVUk4VjdnckRmTmN3SjV6c0RQZ3o2TFVLbDBQeXEwWDVYd3FGeHpEb2hxekExRW5Qd3k5WkdISDYvQ3c1CmhCRzVQb0dqZ2NVd2djSXdEZ1lEVlIwUEFRSC9CQVFEQWdlQU1Bd0dBMVVkRXdFQi93UUNNQUF3SFFZRFZSME8KQkJZRUZPS1UvaEQzVEpqZndzcDczWkhReGNRekJaQ1hNQjhHQTFVZEl3UVlNQmFBRk5tU1ZpTHcxUlBCeHFkMgo5Z3Q1aVZwNFc2bnVNR0lHQ0NvREJBVUdCd2dCQkZaN0ltRjBkSEp6SWpwN0ltaG1Ma0ZtWm1sc2FXRjBhVzl1Cklqb2lJaXdpYUdZdVJXNXliMnhzYldWdWRFbEVJam9pZFRCNFl6ZHBjRFZ2WlMxaFpHMXBiaUlzSW1obUxsUjUKY0dVaU9pSmhaRzFwYmlKOWZUQUtCZ2dxaGtqT1BRUURBZ05IQURCRUFpQkpsOUNYS1NOeVA3aUV5bFlGdlJXMApTRUgvZmpnVDJ2MGpwRzZnT2ZrMlJ3SWdMY0dxVXN1SURWVlF4djhIM1RXLzNVN29hbzdOUjk4b3l0SDhwQ1Q3CmZ6TT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=", + "mspid": "u0o4mkkzs6" + }, + "nonce": "7nnc/HDYOhtV8bE0h8MvRtDOG67WPHbK" + } + } + }, + "signature": "MEQCIBvQlZX1D8raR9ykZLZv12D+NXNfJnX2hqKTuf9pPqmJAiAt2rOYmMYvgIP6+3Unmpa7Nbn0MUB4hJdUBrun+gAyAw==" + } + ] + }, + "header": { + "data_hash": "F7wm/5qRPjZKw1tU0jZVroKauQkOa9a2JXJXSc7pE80=", + "number": "6", + "previous_hash": "jhgYzJOk9+VTxTSSU1BxF3iCVT1tIjyBssLanzki/+0=" + }, + "metadata": { + "metadata": [ + "ChEKAggEEgsKCQoDAQIDEAQYChKxBwrmBgrJBgoKdTBvNG1ra3pzNhK6Bi0tLS0tQkVHSU4gQ0VSVElGSUNBVEUtLS0tLQpNSUlDTmpDQ0FkMmdBd0lCQWdJVWVZUHIxRnJ6SmRDcjlHRjhLRlRydlZ2amhkVXdDZ1lJS29aSXpqMEVBd0l3Ckd6RVpNQmNHQTFVRUF4TVFabUZpY21sakxXTmhMWE5sY25abGNqQWVGdzB5TWpBeE1EUXhOVEEwTURCYUZ3MHoKTWpBeE1ESXhOVEV4TURCYU1GZ3hDekFKQmdOVkJBWVRBbFZUTVJBd0RnWURWUVFIRXdkU1lXeGxhV2RvTVJBdwpEZ1lEVlFRS0V3ZExZV3hsYVdSdk1SQXdEZ1lEVlFRTEV3ZHZjbVJsY21WeU1STXdFUVlEVlFRREV3cDFNSFUxCk1uUnBiMkpuTUZrd0V3WUhLb1pJemowQ0FRWUlLb1pJemowREFRY0RRZ0FFUk12Nzc2ak9PV2htS1cxV0psZlYKYlBMOC9PSEZnRDNya2Z2S2NyNWhaSXExa2gzOVRMTkJHMkpVenVHa01DUHhHbXAwWUtxa25seSsvRFhGZXBFLwpmcU9Cd1RDQnZqQU9CZ05WSFE4QkFmOEVCQU1DQjRBd0RBWURWUjBUQVFIL0JBSXdBREFkQmdOVkhRNEVGZ1FVCjk1NFBDdUhCRms1MjFSMzhVbUlwM2hnd2FZa3dId1lEVlIwakJCZ3dGb0FVMlpKV0l2RFZFOEhHcDNiMkMzbUoKV25oYnFlNHdYZ1lJS2dNRUJRWUhDQUVFVW5zaVlYUjBjbk1pT25zaWFHWXVRV1ptYVd4cFlYUnBiMjRpT2lJaQpMQ0pvWmk1RmJuSnZiR3h0Wlc1MFNVUWlPaUoxTUhVMU1uUnBiMkpuSWl3aWFHWXVWSGx3WlNJNkltOXlaR1Z5ClpYSWlmWDB3Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnRHBjd1FtMmQ0akQycjNtVldaMUJqL3VzZmlDOTdaVnoKd05ERmZ6TkxPcklDSUJrOUF0aWplSGlYdHBLOERLU0Y0T0ZWSlcrRndQRDhKUGIwN1VPTXJDa0gKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQoSGHI4Bc44UiuUS8qJlXCqGKG012VHMxv+tBJGMEQCIDD1N05M8a5LG5+OnhdrKo710ZqqgtWy7YXvSvX8Z0/OAiAFk4hnAjzt/4N48K5F0RDl/jHZX1NVXbHpzeOoPmuB9g==", + "CgIIBA==", + "AA==", + "", + "CiDSOvF/8JM5BXXWkFD87FQh0dgyREcgOT6wHkEEpD9wQA==" + ] + } +} diff --git a/test/resources/block-2.bin b/test/resources/block-2.bin new file mode 100644 index 0000000000000000000000000000000000000000..885af6712021c26c91657930740c88ef932c47e7 GIT binary patch literal 5356 zcmeI0Yp@$-8OP~4OnZ1++e6yX_6T@-xF`gg-OY8&rQQ2(l1;L?C)2TRb~njx_Ll1= zLBT07T5vk$uBbDVi-;gdk%AOa6tD;aO1aHorNgB_1+nM{7`!B>J*{n#Rw&~a@@ezD z&+b0&Gy9+a?|lG0Htw4Ip`|#VA6wdV-!o4Bp?fA0-|_q{kKMickq09$eeZ$yFZNDM zzP)XA@wIu6ul+u8>1XW!uiPT7POUa^0{LY*-4Oo&>+01DvtZ ziFsr1Ui;3v+2iYfx&F#oQwwCJEVlL58B$d=O+`O#B`Qf}$zO(K7%fXEf`r7790ztH(Hw!Lsp%--Iga2og1{uBf;k+HRk%zt zM2JYp$9sFdb_ln-RooZWG7(eA*Bqi!;38I$>uV4-$aetKr;0o-F&Lbour4nY4O*29 zU5n;o)o4DZOJ-4*G)$o#sK;xA3{vRE(ip^(xHyRQ!R}zC8&7AZgFL}?$vzgtE3qt& zHPg5eYl^vSTPCRlk3lmpV&vh&HOc`zU2`a>!0suFz=XXel9(WoLItDDs4rCtQmrx^ zs}|^5i7D&VM6z5+6L}0sD%z?^VZ~?)RZF(Ys5=e0qk1n+;d&7-hWbceFm<+4Hu$I2lJ$9MwwVGfBE$ZQfyV*=~JG0wqAOx~U6 zqv(o|u^NzvYuRm3^B7E4ka2G{g}X~_M(}bD>rCIQ;dsTNEi8i+!#S=)W~mU%*xnLC zc#Uw*td{DE&jDd!Iys5sK8NKvF@@00VotA>ynHL2qcy=mJ7NzOBqP)=F{oC^IZkfa zrwUKuLV*PYMp5Z}Q!V6^kYw^|+-#~MQ)mmhq9NwG_8@MT5T6=XIWP(9Fp~GHbkc0uA z5(tirSNM3iB*k-)l+;M&LU@_1)_Zc?aBFES4bv=YSvnP=e05C4qZOY;v#e4L$$SwIw_42rQngv^?6bl27f;2=gH_;K+#X}zqT}-(UHk4!mt4uZ zzDc~W!yP@ov(9zV60l|o_*7P(%IZ^D9YI;$<^m+Z!ODXzY~pkp?i<~dYpdi(v2#yXy zZm~~$Al=;$cYP6`W99BZbJmRZBy{f=+kzm`Z_&$tcXowt{{K9DHhHXFJCX^)#Yra| zH~#xm)@fPi;jOMq=YWT%<)}Acq}O&Du1$xQV|sr%vaFbC3Y2NeU6gb(ZJ`Z`NK}wK z41dJr2p@2W_FL5Cy9hA&tQq&sd?FbJZZFG-3Ak_vJcu(H-@)K$EN zNmdQ@nhCGr;agp%8Yu^wrb_ljs9*;9C|dU`xm2Z)GBa2`X}Lr7Tp$?vm~xCI(+Y^1 zSh9s-5sqr*QN)vKp?tzhWDb&JN;SG>8nI9ol~ug%>rqgoWaR_EOi-VY9QSm2*yR67 zjwx&)M}eziJVn}KvK)g&ty3b=Fcqa^j5{XgxK=n$Rzq!r3*)sM5DB}DsTiHLLzq8X z>&s23q}Dke4TQ|PYTIbd=ZVr>FcP!{zuVvS9^zoZK-9?6qyF~vZ|YmN@7!_xL(-qG z-E_To$vZ#3`gyDHgJV*!&0hMubH6?^xnRpp&=niDUVi6OS6zOhIHlbG;xifiyB7yG zsyB)3t9QNb+Wnw+FYDj^uUE{MuU>od<~PqfZ`<+{-a~KLeBU!~AxjqA1sw-QCd2c$ zpLt*T8|fRlk(IYTdBs1H;@0$vznnOK{go@v8f{)MG4b9Uuw@Q77L1G??V3Aam(f`x zqvJ;}UgX+12fR8PJhHzJ-+M^m$A;4b8QJT@N+HpJ=|(>+6AhH1LlHXN=wv%uRn8nS z;U73X_)pXPw6UYvmeZw^n!Ve@drpX58iU_gt@$@ajEA^$f&CT+L)rIHp_|?MePYEBdJ<(j_KjYqOuete!?jAKi={$AM)t9Gl@t?fv zbznSx$=oOM7w@`#CmO%A1Rl5*X^l*N>7mhOcnV*b*wnja{Bw_<^xU)EUAz9eLEN6D z{SVK#)`O#mItH_j0!M)^O;? z*zpUyR7HzG2vw*;DvyW<-a!Z|RpOxt9*T;DP$?1*JW#|FNR{A$1v|OPy)$>_&YjU} z@74V}{-1N~bGhi{rVsN_&+!A|AhL%l9*&UE#~!t&pF-$re;X|z$@ zYP2!CRjIVxU;fVb|LBeH{_PL-fBG)+TMYP32KhRJ{P<1qa&JAg*47&N=Qoj$ z?gMcCr!edz|8wu``**=-i(kKY7x~M($e-VR==xvZdbjw%DTWi@Ca8hT(HsF~YTpV3 zjwAXqK|pKhz=Xr0!wn#3m#BXJ2!q~$Ue)KV8jm$8B_W&%*9|o;o1s)5o9Hsd0|Cw% zSWzh46^PMDNU01Ca7P^FBgw>EbPdK2j9O=-t5izQpmMlo%H$}&=HU!519~&5`q`w~ zN@)@f4*{V}R7coEtt1AcOSHZ|&mFsIh=6`*q6itv9Js z8*S{m#u$&ZVSNUSDw?P=lyS#gx${}CFL5~TIOo3$IG%_Egz_?V+I`XPH2iG~Ta^23 zV6l~rrjEgI%g)De)`X(cYN&C4!!L=2v#L;|ag&*@7nQ8+sBO|(WoR2qWgtVi**sS30I*v zm}b|!y1QGM0u1vRR?JYq+WWgfO(;+j!Lh^j-00{&%i~NMX5(;0SF5xQeygKhkX252FzO1Q$cE0vCuult0 zB(J8*ga7^9{^B*6L=>515XVfC=p-OCW!%#JCCOD1hwE-e2?@Iw?^Yt=3||UTlmOze z^%yGYjFgO$g?yqSvj|-1N>w`2Rc&hq_2opSQ~{R`S(gf;Ql&Q>rR~5(tR{N>9 zq;X@m?&Kp37WEF>q*mi{xkt#>NEbzciW0KO^#R(45?jV${u&(%3NT^7r6eE(2$MWx zioh98NC_80hfz6G)V7trYQOG3+2~2W) zcI1+Vb$UUNV@lty%j}pEtYIG&ae{ULI*wZ1krhm9v!z*$2#%C`_2#^o8SI&5b@Rn% z$e2~y>&S_vVdP;-G72Ak%)}r+U1kXr=78<_hNn@375D4{3VOM~@MmF{WX~?(u#KOe zT|hxEH((cCz@2@%=j<|=GHFhx2ZjMhin$rX40X&>MwKcvLR%P$Cj<&6QOoO%RjE^1 z>|&*d_V9YiHD|<-n+}G8%W3Q`h?FE8j$1A)*VloEYilp*x}~^18D}!k6_qb2=6M+g z94Y2T48v5*ajAhrLMz+L;Y9YBMULzG&S@@evNY^fv|+#8uZ1+KHLfzuJZG2*!Aaz5 z;ZVtFCxr`)I9Z<1V~2C0&{)ofGMg%x6>U^?iV8|c z*u+u0W07S|09(5r<9rxP+jMkIhMnDl)4*Fq4{G3Lb?wF0IzoO4`I*}<T;IVc@(w@(%KL9{bam`rJX@xZJ0ZvlmkhEIk*O&oI?H=bg{K0$#)Q z*mPwtu)kA7dUoxLDXXk=Jj37J22=a4F`vj&C7vPPkNcrH zAL+g~)r&_$t{Ly;;|iZ*&Kr*79Tyb&;a9<{#dC)1*xLjV@;6^aJ}!YfLp@mf*2M+n z-%H3}-7j2$Qz?Z1pWQ$E{d)%w@H_XA5APu#-@Ul_uTO<(WtPhp+q;12p4HUrb2{SW zaaCoMnlPj1Ds}~?&FqtgX-0Jv&b7Ae)vHco|ImQZfm*||f~mDRP-@aB*1L~@u5 zEijOBDDFbaCEL*=t}iLL=!rUGWz7*mVP$2}UC^0;kKE8|sO3O~x&735v&uIDI8g8% zP$Vdu7QN}0S8KRddMndOd7kyr4Z}Xo9DMVsS;mlsIVc^oOyU>YLAvB&rZO30OwmCm zHzwH?bFkl;pQt%_rmcD0nc(qT^X8%m2y}Gv+|OgVbANN;xeubKb?~QoMS#RLTw^Lr zV~*!0$xM%)=45Gd!vG%V7%m%z%`~iq9VQ&{DW2OL%gc7Fg}U8FfyJwF?$c>wU7}lf z7R`lSoQ&XZBB_h4F>%^ygO?+s-GHpfROV@Sv|OuLJlXlOu(~h|;Bk)OX@=o)UVvv! zjH6)~ch#`s$bAs@V}tX(p$02~n=~i+hOc-Xv4T>fM9B3vKX2tueUpa#y5h~GnJ}Wa zb-g4m7K0f-9&Erge^hN%69T=VY&DjY%HixP!h)lGp-{9%Z*$$nM;~J_*BbmMEYoSPAUpxf3Pd;|~|74v8{P z=mLTzsY|6#NRYuHnZUljmTE^A^u%m>QZnUw{b0Z23D&lgbSHZ5I8sJBY}C7(b#ph{ z`h(>{7*!OE)%kwRQZ2eX+d?{RaU%~$fm1EF&9c#wnR?CZ+I42#X;Y9N)7Y*35KcOfzuYoazu8punv&u3Nng?Mxf##X zq~c7{XiI6GYclUBEFV;k;uZNtZ+~L5v&L)DuERJyO56o_#(@CgOG%k&G%#y|+kwkA)?zTF*!Ji3k59*$x@ zTT0o`aEGj2l3kV=H%UnJDaNM*0x;QNQZTWNvf+!3u z0RucQVjKX2bn^i+cG_4djN`CsZy`~^NNr)eW>4x&neB>Zd12hZ1a$kT&~rpK2Mh}2 z73)5|F2EXd3?o`3u5EK6YScu5wTBinY?73k;+=`l)&~yf^s=}%XqwoR(rOJ9c7WF` zqny%P*McZGtmX%4-L0|>OseJ_ta1A`o2I9Taq#w${ zv{cE&TE-# z#DJ%1fN>XWEsz^{MezoeaQy04w;|x6gKdjO5qIVCrA0YP;M8wM4Agk7jiv{ z_x;`D#(l$%}GIQIwNS8xp|Qw)Lp( zlcqoJM`@|%M={rAW2v^|)Uk?nEp+HFB+luSX&ny9GDk1T`i2^FJzAp%_7(&#Hq#b# zqpGb5nIf~ODP5NsS48Z8JT+0&Jr!Mu*#2}!XsI&tw2|cvv9v9X$I?|Q`ZPrBm0d+$ z?7&obM8sYRz8M{TY9jV3%UmO3uLfU3Nhc9I_2yEI$43etduC%>HE265QK8$nyDh7a z?pCe3!c(%&;&emY!c3qV9G@0&GmDzBR0^4A;u|?CfR~FxTPVe66|zO2(f^a|d zJ#&}K?$`1f^6jGxKYO!$9#=m9ACB^&5dY4TE<$ literal 0 HcmV?d00001 From fd8372a0e31002b54d8b5d89cb1fdb7d7c5929f0 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Wed, 12 Jan 2022 21:59:30 -0500 Subject: [PATCH 3/4] Added optional timestamps to chaincode events by retrieving from Fabric Signed-off-by: Jim Zhang --- go.mod | 1 + go.sum | 2 + internal/events/eventstream.go | 50 +++++++++++-------- internal/events/eventstream_test.go | 30 +++++++---- internal/events/subscription.go | 22 ++++++++ internal/events/test_helper.go | 16 ++++++ internal/fabric/client/api.go | 2 + internal/fabric/client/client_ccp.go | 14 ++++++ .../client/client_gateway_clientside.go | 14 ++++++ internal/fabric/client/ledger.go | 13 +++++ mocks/fabric/client/rpc_client.go | 34 +++++++++++++ 11 files changed, 168 insertions(+), 30 deletions(-) diff --git a/go.mod b/go.mod index 5ee6d9b..09b9858 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/google/certificate-transparency-go v1.1.1 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gorilla/websocket v1.4.2 + github.com/hashicorp/golang-lru v0.5.4 github.com/hyperledger/fabric-config v0.0.7 // indirect github.com/hyperledger/fabric-protos-go v0.0.0-20201028172056-a3136dde2354 github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf diff --git a/go.sum b/go.sum index dd48945..845c262 100644 --- a/go.sum +++ b/go.sum @@ -334,6 +334,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/internal/events/eventstream.go b/internal/events/eventstream.go index 03525a5..2107b11 100644 --- a/internal/events/eventstream.go +++ b/internal/events/eventstream.go @@ -30,6 +30,7 @@ import ( eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" "github.com/hyperledger/firefly-fabconnect/internal/ws" + lru "github.com/hashicorp/golang-lru" log "github.com/sirupsen/logrus" ) @@ -73,6 +74,7 @@ type StreamInfo struct { Webhook *webhookActionInfo `json:"webhook,omitempty"` WebSocket *webSocketActionInfo `json:"websocket,omitempty"` Timestamps bool `json:"timestamps,omitempty"` // Include block timestamps in the events generated + TimestampCacheSize int `json:"timestampCacheSize,omitempty"` } type webhookActionInfo struct { @@ -91,26 +93,27 @@ type webSocketActionInfo struct { type eventHandler func(*eventData) type eventStream struct { - sm subscriptionManager - allowPrivateIPs bool - spec *StreamInfo - eventStream chan *eventData - eventHandler eventHandler - stopped bool - processorDone bool - pollingInterval time.Duration - pollerDone bool - inFlight uint64 - batchCond *sync.Cond - batchQueue *list.List - batchCount uint64 - initialRetryDelay time.Duration - backoffFactor float64 - updateInProgress bool - updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context) - updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped - action eventStreamAction - wsChannels ws.WebSocketChannels + sm subscriptionManager + allowPrivateIPs bool + spec *StreamInfo + eventStream chan *eventData + eventHandler eventHandler + stopped bool + processorDone bool + pollingInterval time.Duration + pollerDone bool + inFlight uint64 + batchCond *sync.Cond + batchQueue *list.List + batchCount uint64 + initialRetryDelay time.Duration + backoffFactor float64 + updateInProgress bool + updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context) + updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped + action eventStreamAction + wsChannels ws.WebSocketChannels + blockTimestampCache *lru.Cache } type eventStreamAction interface { @@ -149,6 +152,9 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS } else { spec.ErrorHandling = ErrorHandlingSkip } + if spec.TimestampCacheSize == 0 { + spec.TimestampCacheSize = DefaultTimestampCacheSize + } a = &eventStream{ sm: sm, @@ -164,6 +170,10 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS } a.eventHandler = a.handleEvent + if a.blockTimestampCache, err = lru.New(spec.TimestampCacheSize); err != nil { + return nil, errors.Errorf(errors.EventStreamsCreateStreamResourceErr, err) + } + if a.pollingInterval == 0 { // Let's us do this from UTs, without exposing it a.pollingInterval = 10 * time.Millisecond diff --git a/internal/events/eventstream_test.go b/internal/events/eventstream_test.go index 8542265..4616790 100644 --- a/internal/events/eventstream_test.go +++ b/internal/events/eventstream_test.go @@ -380,7 +380,7 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) { &StreamInfo{ BatchSize: 1, Webhook: &webhookActionInfo{}, - Timestamps: false, + Timestamps: true, }, db, 200) defer svr.Close() @@ -392,9 +392,15 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { + // the block event e1s := <-eventStream assert.Equal(1, len(e1s)) assert.Equal(uint64(11), e1s[0].BlockNumber) + // the chaincode event + e2s := <-eventStream + assert.Equal(1, len(e2s)) + assert.Equal(uint64(10), e2s[0].BlockNumber) + assert.Equal(int64(1000000), e2s[0].Timestamp) wg.Done() }() wg.Wait() @@ -417,7 +423,7 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) { _ = db.Init() sm, stream, svr, eventStream := newTestStreamForBatching( &StreamInfo{ - BatchSize: 1, + BatchSize: 2, Webhook: &webhookActionInfo{}, Timestamps: false, }, db, 200) @@ -432,11 +438,9 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) { wg.Add(1) go func() { e1s := <-eventStream - assert.Equal(1, len(e1s)) + assert.Equal(2, len(e1s)) assert.Equal(uint64(1), e1s[0].BlockNumber) - e2s := <-eventStream - assert.Equal(1, len(e2s)) - assert.Equal(uint64(11), e2s[0].BlockNumber) + assert.Equal(uint64(11), e1s[1].BlockNumber) wg.Done() }() wg.Wait() @@ -516,6 +520,10 @@ func TestProcessEventsEnd2EndWithReset(t *testing.T) { e1s := <-eventStream assert.Equal(1, len(e1s)) assert.Equal(uint64(11), e1s[0].BlockNumber) + // the chaincode event + e2s := <-eventStream + assert.Equal(1, len(e2s)) + assert.Equal(uint64(10), e2s[0].BlockNumber) wg.Done() }() wg.Wait() @@ -625,7 +633,7 @@ func TestPauseResumeAfterCheckpoint(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { <-eventStream } wg.Done() @@ -691,7 +699,7 @@ func TestPauseResumeBeforeCheckpoint(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { <-eventStream } wg.Done() @@ -731,7 +739,7 @@ func TestMarkStaleOnError(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { <-eventStream } wg.Done() @@ -809,7 +817,7 @@ func TestStoreCheckpointStoreError(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { - for i := 0; i < 1; i++ { + for i := 0; i < 2; i++ { <-eventStream } wg.Done() @@ -1079,6 +1087,7 @@ func TestUpdateStreamMissingWebhookURL(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { + <-eventStream <-eventStream wg.Done() }() @@ -1122,6 +1131,7 @@ func TestUpdateStreamInvalidWebhookURL(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) go func() { + <-eventStream <-eventStream wg.Done() }() diff --git a/internal/events/subscription.go b/internal/events/subscription.go index c1304e3..5507c49 100644 --- a/internal/events/subscription.go +++ b/internal/events/subscription.go @@ -140,6 +140,9 @@ func (s *subscription) processNewEvents() { EventName: ccEvent.EventName, Payload: ccEvent.Payload, } + if s.ep.stream.spec.Timestamps { + s.getEventTimestamp(event) + } if err := s.ep.processEventEntry(s.info, event); err != nil { log.Errorf("Failed to process event: %s", err) } @@ -147,6 +150,25 @@ func (s *subscription) processNewEvents() { } } +func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) { + // the key in the cache is the block number represented as a string + blockNumber := strconv.FormatUint(evt.BlockNumber, 10) + if ts, ok := s.ep.stream.blockTimestampCache.Get(blockNumber); ok { + // we found the timestamp for the block in our local cache, assert it's type and return, no need to query the chain + evt.Timestamp = ts.(int64) + return + } + // we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp + _, block, err := s.client.QueryBlock(s.info.ChannelId, evt.BlockNumber, s.info.Signer) + if err != nil { + log.Errorf("Unable to retrieve block[%s] timestamp: %s", blockNumber, err) + evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp. + return + } + evt.Timestamp = block.Timestamp + s.ep.stream.blockTimestampCache.Add(blockNumber, evt.Timestamp) +} + func (s *subscription) unsubscribe(deleting bool) { log.Infof("%s: Unsubscribing existing filter (deleting=%t)", s.info.ID, deleting) s.deleting = deleting diff --git a/internal/events/test_helper.go b/internal/events/test_helper.go index 291e590..d33eb9d 100644 --- a/internal/events/test_helper.go +++ b/internal/events/test_helper.go @@ -29,6 +29,7 @@ import ( "github.com/hyperledger/fabric-protos-go/peer" "github.com/hyperledger/firefly-fabconnect/internal/conf" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" "github.com/hyperledger/firefly-fabconnect/internal/kvstore" mockfabric "github.com/hyperledger/firefly-fabconnect/mocks/fabric/client" mockkvstore "github.com/hyperledger/firefly-fabconnect/mocks/kvstore" @@ -190,8 +191,18 @@ func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { Height: 10, }, } + rawBlock := &utils.RawBlock{ + Header: &common.BlockHeader{ + Number: uint64(20), + }, + } + block := &utils.Block{ + Number: uint64(20), + Timestamp: int64(1000000), + } rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil) rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil) + rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil) rpc.On("Unregister", mock.Anything).Return() go func() { @@ -203,6 +214,11 @@ func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient { blockEventChan <- &fab.BlockEvent{ Block: constructBlock(11), } + ccEventChan <- &fab.CCEvent{ + BlockNumber: uint64(10), + TxID: "3144a3ad43dcc11374832bbb71561320de81fd80d69cc8e26a9ea7d3240a5e84", + ChaincodeID: "asset_transfer", + } if len(withReset) > 0 { blockEventChan <- &fab.BlockEvent{ Block: constructBlock(11), diff --git a/internal/fabric/client/api.go b/internal/fabric/client/api.go index 9fdc45b..a30a906 100644 --- a/internal/fabric/client/api.go +++ b/internal/fabric/client/api.go @@ -22,6 +22,7 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" ) type ChaincodeSpec struct { @@ -54,6 +55,7 @@ type RPCClient interface { Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error) Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error) QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error) + QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) Unregister(*RegistrationWrapper) diff --git a/internal/fabric/client/client_ccp.go b/internal/fabric/client/client_ccp.go index ed995e8..1ba2253 100644 --- a/internal/fabric/client/client_ccp.go +++ b/internal/fabric/client/client_ccp.go @@ -29,6 +29,7 @@ import ( mspImpl "github.com/hyperledger/fabric-sdk-go/pkg/msp" "github.com/hyperledger/firefly-fabconnect/internal/errors" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" log "github.com/sirupsen/logrus" ) @@ -148,6 +149,19 @@ func (w *ccpRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockchai return result, nil } +func (w *ccpRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { + log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) + + rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) + if err != nil { + log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) + return nil, nil, err + } + + log.Tracef("RPC [%s] <-- success", channelId) + return rawblock, block, nil +} + // The returned registration must be closed when done func (w *ccpRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) { reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since) diff --git a/internal/fabric/client/client_gateway_clientside.go b/internal/fabric/client/client_gateway_clientside.go index b196836..a9543fa 100644 --- a/internal/fabric/client/client_gateway_clientside.go +++ b/internal/fabric/client/client_gateway_clientside.go @@ -25,6 +25,7 @@ import ( "github.com/hyperledger/fabric-sdk-go/pkg/gateway" "github.com/hyperledger/firefly-fabconnect/internal/errors" eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api" + "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" log "github.com/sirupsen/logrus" ) @@ -110,6 +111,19 @@ func (w *gwRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockchain return result, nil } +func (w *gwRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { + log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber) + + rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer) + if err != nil { + log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err) + return nil, nil, err + } + + log.Tracef("RPC [%s] <-- success", channelId) + return rawblock, block, nil +} + func (w *gwRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId) diff --git a/internal/fabric/client/ledger.go b/internal/fabric/client/ledger.go index 1b8c29f..a39dd00 100644 --- a/internal/fabric/client/ledger.go +++ b/internal/fabric/client/ledger.go @@ -60,6 +60,19 @@ func (l *ledgerClientWrapper) queryChainInfo(channelId, signer string) (*fab.Blo return result, nil } +func (l *ledgerClientWrapper) queryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { + client, err := l.getLedgerClient(channelId, signer) + if err != nil { + return nil, nil, errors.Errorf("Failed to get channel client. %s", err) + } + result, err := client.QueryBlock(blockNumber) + if err != nil { + return nil, nil, err + } + rawblock, block, err := utils.DecodeBlock(result) + return rawblock, block, err +} + func (l *ledgerClientWrapper) queryTransaction(channelId, signer, txId string) (map[string]interface{}, error) { client, err := l.getLedgerClient(channelId, signer) if err != nil { diff --git a/mocks/fabric/client/rpc_client.go b/mocks/fabric/client/rpc_client.go index 06a7084..2037146 100644 --- a/mocks/fabric/client/rpc_client.go +++ b/mocks/fabric/client/rpc_client.go @@ -9,6 +9,8 @@ import ( fab "github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab" mock "github.com/stretchr/testify/mock" + + utils "github.com/hyperledger/firefly-fabconnect/internal/fabric/utils" ) // RPCClient is an autogenerated mock type for the RPCClient type @@ -76,6 +78,38 @@ func (_m *RPCClient) Query(channelId string, signer string, chaincodeName string return r0, r1 } +// QueryBlock provides a mock function with given fields: channelId, blockNumber, signer +func (_m *RPCClient) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) { + ret := _m.Called(channelId, blockNumber, signer) + + var r0 *utils.RawBlock + if rf, ok := ret.Get(0).(func(string, uint64, string) *utils.RawBlock); ok { + r0 = rf(channelId, blockNumber, signer) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*utils.RawBlock) + } + } + + var r1 *utils.Block + if rf, ok := ret.Get(1).(func(string, uint64, string) *utils.Block); ok { + r1 = rf(channelId, blockNumber, signer) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*utils.Block) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(string, uint64, string) error); ok { + r2 = rf(channelId, blockNumber, signer) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // QueryChainInfo provides a mock function with given fields: channelId, signer func (_m *RPCClient) QueryChainInfo(channelId string, signer string) (*fab.BlockchainInfoResponse, error) { ret := _m.Called(channelId, signer) From e96ad596954e2b69d141841f0ec479613cc7b6b7 Mon Sep 17 00:00:00 2001 From: Jim Zhang Date: Wed, 12 Jan 2022 22:33:33 -0500 Subject: [PATCH 4/4] Updated openapi spec Signed-off-by: Jim Zhang --- internal/events/api/event.go | 3 ++- internal/events/evtprocessor.go | 2 +- openapi/spec.yaml | 36 ++++++++++++++++++++++++++++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/internal/events/api/event.go b/internal/events/api/event.go index 77f5d8b..9cb9417 100644 --- a/internal/events/api/event.go +++ b/internal/events/api/event.go @@ -22,6 +22,7 @@ const ( EventPayloadType_Bytes = "bytes" // default data type of the event payload, no special processing is done before returning to the subscribing client EventPayloadType_String = "string" // event payload will be an UTF-8 encoded string EventPayloadType_StringifiedJSON = "stringifiedJSON" // event payload will be a structured map with UTF-8 encoded string values + EventPayloadType_JSON = "json" // equivalent to "stringifiedJSON" ) // persistedFilter is the part of the filter we record to storage @@ -52,7 +53,7 @@ type SubscriptionInfo struct { Signer string `json:"signer"` FromBlock string `json:"fromBlock,omitempty"` Filter persistedFilter `json:"filter"` - PayloadType string `json:"payloadType,omitempty"` // optional. data type of the payload bytes; "bytes", "string" or "stringifiedJSON". Default to "bytes" + PayloadType string `json:"payloadType,omitempty"` // optional. data type of the payload bytes; "bytes", "string" or "stringifiedJSON/json". Default to "bytes" } // GetID returns the ID (for sorting) diff --git a/internal/events/evtprocessor.go b/internal/events/evtprocessor.go index 2020c8d..4febf65 100644 --- a/internal/events/evtprocessor.go +++ b/internal/events/evtprocessor.go @@ -73,7 +73,7 @@ func (ep *evtProcessor) processEventEntry(subInfo *api.SubscriptionInfo, entry * switch payloadType { case api.EventPayloadType_String: entry.Payload = string(entry.Payload.([]byte)) - case api.EventPayloadType_StringifiedJSON: + case api.EventPayloadType_StringifiedJSON, api.EventPayloadType_JSON: structuredMap := make(map[string]interface{}) err := json.Unmarshal(entry.Payload.([]byte), &structuredMap) if err != nil { diff --git a/openapi/spec.yaml b/openapi/spec.yaml index 75763bd..ef22022 100644 --- a/openapi/spec.yaml +++ b/openapi/spec.yaml @@ -393,10 +393,44 @@ components: enum: - websocket - webhook + default: websocket websocket: oneOf: - $ref: "#/components/schemas/websocket_info" - $ref: "#/components/schemas/webhook_info" + suspended: + type: boolean + default: false + description: if set to 'true', the stream will be suspended + batchSize: + type: integer + default: 1 + description: how many events should be packed in each event batch to deliver to the client. Range is 1-1000 + batchTimeoutMS: + type: integer + default: 5000 + description: if there are pending events to deliver, but the batch size has not been reached, this is the maximum amount of milliseconds to wait before delivering the current batch + errorHandling: + type: string + description: when the delivery should be blocked when the event listener client failed to take delivery, or skip and continue + enum: + - block + - skip + default: skip + retryTimeoutSec: + type: integer + description: total amount of time (in seconds) to retry a failed event delivery + blockedReryDelaySec: + type: integer + description: amount of time (in seconds) to wait before retrying a failed delivery + timestamps: + type: boolean + default: false + description: Chaincode events from Fabric do not contain timestamps. If set to 'true', FabConnect to make a call to Fabric to download the block to obtain the timestamp, and set on the event. Note doing this incurs time overhead in events processing + timestampCacheSize: + type: integer + default: 1000 + description: The size of the internal cache for the blocknumber <-> timestamp map subscription_input: type: "object" properties: @@ -416,7 +450,7 @@ components: type: string default: string enum: - - stringifiedJSON + - json - string filter: type: object