diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 7e9dd73d2..da9ca7809 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -271,7 +271,7 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities { return e.capabilities } -func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) { ethLocation, err := e.parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -286,7 +286,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N if !ok { return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") } - sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI) + sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI, lastProtocolID) if err != nil { return "", err @@ -874,7 +874,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio return result, err } -func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) { +func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) { var location *Location namespace := listener.Namespace if listener.Location != nil { @@ -893,7 +893,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr if listener.Options != nil { firstEvent = listener.Options.FirstEvent } - result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi) + result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID) if err != nil { return err } diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index 83a624056..01a24f7d4 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -906,7 +906,7 @@ func TestInitAllExistingStreams(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -964,7 +964,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -1022,7 +1022,7 @@ func TestInitAllExistingStreamsOld(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 4, httpmock.GetTotalCallCount()) @@ -1080,7 +1080,7 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10416", err) } @@ -2027,7 +2027,7 @@ func TestAddSubscription(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -2062,7 +2062,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -2097,7 +2097,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10311", err) } @@ -2118,7 +2118,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -2147,7 +2147,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10111", err) assert.Regexp(t, "pop", err) @@ -4005,7 +4005,7 @@ func TestAddSubBadLocation(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := e.AddFireflySubscription(e.ctx, ns, contract) + _, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10310", err) } @@ -4025,9 +4025,15 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", httpmock.NewJsonResponderOrPanic(200, []subscription{})) httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", - httpmock.NewJsonResponderOrPanic(200, subscription{ - ID: "sub1", - })) + func(r *http.Request) (*http.Response, error) { + var s subscription + err := json.NewDecoder(r.Body).Decode(&s) + assert.NoError(t, err) + assert.Equal(t, "19", s.FromBlock) + return httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub1", + })(r) + }) httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(2)) utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") @@ -4055,7 +4061,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := e.AddFireflySubscription(e.ctx, ns, contract) + subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "000000000020/000000/000000") assert.NoError(t, err) assert.NotNil(t, e.subs.GetSubscription("sub1")) @@ -4103,7 +4109,7 @@ func TestAddFireflySubscriptionV1(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.NotNil(t, e.subs.GetSubscription("sub1")) } @@ -4147,7 +4153,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10465", err) } @@ -4189,7 +4195,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } @@ -4231,7 +4237,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } @@ -4273,7 +4279,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10111", err) } diff --git a/internal/blockchain/ethereum/eventstream.go b/internal/blockchain/ethereum/eventstream.go index 498f3117d..f3853e5ea 100644 --- a/internal/blockchain/ethereum/eventstream.go +++ b/internal/blockchain/ethereum/eventstream.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -21,6 +21,8 @@ import ( "crypto/sha256" "encoding/hex" "fmt" + "strconv" + "strings" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly-common/pkg/ffresty" @@ -201,13 +203,26 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) { +func latestOrLastBlock(protocolID string) string { + if len(protocolID) > 0 { + blockStr := strings.Split(protocolID, "/")[0] + blockNumber, err := strconv.ParseUint(blockStr, 10, 64) + if err == nil { + // We jump back on block from the last event, to minimize re-delivery while ensuring + // we get all events since the last delivered (including subsequent events in the same block) + return strconv.FormatUint(blockNumber-1, 10) + } + } + return "latest" +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) { // Map FireFly "firstEvent" values to Ethereum "fromBlock" values switch firstEvent { case string(core.SubOptsFirstEventOldest): firstEvent = "0" case string(core.SubOptsFirstEventNewest): - firstEvent = "latest" + firstEvent = latestOrLastBlock(lastProtocolID) } sub := subscription{ Name: subName, @@ -244,7 +259,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry, lastProtocolID string) (sub *subscription, err error) { // Include a hash of the instance path in the subscription, so if we ever point at a different // contract configuration, we re-subscribe from block 0. // We don't need full strength hashing, so just use the first 16 chars for readability. @@ -286,7 +301,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace name = v1Name } location := &Location{Address: instancePath} - if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil { + if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, lastProtocolID); err != nil { return nil, err } log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID) diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 49c2c2ec5..04d15a680 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -19,6 +19,8 @@ package fabric import ( "context" "fmt" + "strconv" + "strings" "github.com/go-resty/resty/v2" "github.com/hyperledger/firefly-common/pkg/ffresty" @@ -177,10 +179,25 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) ( return sub.Name, nil } -func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent string) (*subscription, error) { +func newestOrLastBlock(protocolID string) string { + if len(protocolID) > 0 { + blockStr := strings.Split(protocolID, "/")[0] + blockNumber, err := strconv.ParseUint(blockStr, 10, 64) + if err == nil { + // We jump back on block from the last event, to minimize re-delivery while ensuring + // we get all events since the last delivered (including subsequent events in the same block) + return strconv.FormatUint(blockNumber-1, 10) + } + } + return "newest" +} + +func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) { // Map FireFly "firstEvent" values to Fabric "fromBlock" values if firstEvent == string(core.SubOptsFirstEventOldest) { firstEvent = "0" + } else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) { + firstEvent = newestOrLastBlock(lastProtocolID) } sub := subscription{ Name: name, @@ -221,7 +238,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok return nil } -func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event string) (sub *subscription, err error) { +func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event, lastProtocolID string) (sub *subscription, err error) { existingSubs, err := s.getSubscriptions(ctx) if err != nil { return nil, err @@ -250,7 +267,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace if version == 1 { name = v1Name } - if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent); err != nil { + if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent, lastProtocolID); err != nil { return nil, err } log.L(ctx).Infof("%s subscription: %s", event, sub.ID) diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 54cdf8b29..b0b4f1cf1 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -411,7 +411,7 @@ func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsT return nil } -func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) { fabricOnChainLocation, err := parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -437,7 +437,7 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Nam if !ok { return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") } - sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent) + sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent, lastProtocolID) if err != nil { return "", err } @@ -925,7 +925,7 @@ func encodeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, return result, err } -func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener) error { +func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) error { namespace := listener.Namespace location, err := parseContractLocation(ctx, listener.Location) if err != nil { @@ -933,7 +933,7 @@ func (f *Fabric) AddContractListener(ctx context.Context, listener *core.Contrac } subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID) - result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent) + result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent, lastProtocolID) if err != nil { return err } diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 1b315e453..1c5cc7550 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -442,7 +442,7 @@ func TestInitAllExistingStreams(t *testing.T) { <-toServer - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -500,7 +500,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) { <-toServer - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -525,6 +525,7 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) { json.NewDecoder(req.Body).Decode(&body) assert.Equal(t, "firefly", body["channel"]) assert.Equal(t, nil, body["chaincode"]) + assert.Equal(t, "9", body["fromBlock"]) return httpmock.NewJsonResponderOrPanic(200, body)(req) }) @@ -554,7 +555,7 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "000000000010/4763a0c50e3bba7cef1a7ba35dd3f9f3426bb04d0156f326e84ec99387c4746d") assert.NoError(t, err) } @@ -604,7 +605,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10465", err) } @@ -646,7 +647,7 @@ func TestAddFireflySubscriptionBadOptions(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -687,7 +688,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -729,7 +730,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "pop", err) } @@ -784,7 +785,7 @@ func TestAddAndRemoveFireflySubscriptionDeprecatedSubName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := e.AddFireflySubscription(e.ctx, ns, contract) + subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -846,7 +847,7 @@ func TestAddFireflySubscriptionInvalidSubName(t *testing.T) { <-toServer ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10416", err) } @@ -860,7 +861,7 @@ func TestAddFFSubscriptionBadLocation(t *testing.T) { FirstEvent: "oldest", } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := e.AddFireflySubscription(e.ctx, ns, contract) + _, err := e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "F10310", err) } @@ -1091,7 +1092,7 @@ func TestSubQueryCreateError(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.Regexp(t, "FF10284.*pop", err) } @@ -1139,7 +1140,7 @@ func TestSubQueryCreate(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} e.streamID["ns1"] = "es12345" - _, err = e.AddFireflySubscription(e.ctx, ns, contract) + _, err = e.AddFireflySubscription(e.ctx, ns, contract, "") assert.NoError(t, err) } @@ -1914,7 +1915,7 @@ func TestAddSubscription(t *testing.T) { return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req) }) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -1948,7 +1949,7 @@ func TestAddSubscriptionNoChannel(t *testing.T) { return httpmock.NewJsonResponderOrPanic(200, &subscription{})(req) }) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310.*channel", err) } @@ -1971,7 +1972,7 @@ func TestAddSubscriptionNoLocation(t *testing.T) { }, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310.*channel", err) } @@ -1992,7 +1993,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -2022,7 +2023,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := e.AddContractListener(context.Background(), sub) + err := e.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10284.*pop", err) } diff --git a/internal/blockchain/tezos/tezos.go b/internal/blockchain/tezos/tezos.go index e9b1a2383..9f2225e5b 100644 --- a/internal/blockchain/tezos/tezos.go +++ b/internal/blockchain/tezos/tezos.go @@ -261,7 +261,11 @@ func (t *Tezos) Capabilities() *blockchain.Capabilities { return t.capabilities } -func (t *Tezos) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) { +func (t *Tezos) AddFireflySubscription(ctx context.Context, + namespace *core.Namespace, + contract *blockchain.MultipartyContract, + _ string, // Tezos lexicographically sortable protocol IDs for not yet implemented for events +) (string, error) { tezosLocation, err := t.parseContractLocation(ctx, contract.Location) if err != nil { return "", err @@ -406,7 +410,11 @@ func (t *Tezos) NormalizeContractLocation(ctx context.Context, ntype blockchain. return t.encodeContractLocation(ctx, parsed) } -func (t *Tezos) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) { +func (t *Tezos) AddContractListener( + ctx context.Context, + listener *core.ContractListener, + _ string, // Tezos lexicographically sortable protocol IDs for not yet implemented for events +) (err error) { var location *Location if listener.Location != nil { location, err = t.parseContractLocation(ctx, listener.Location) diff --git a/internal/blockchain/tezos/tezos_test.go b/internal/blockchain/tezos/tezos_test.go index 2a55d109f..ee9c3dd8a 100644 --- a/internal/blockchain/tezos/tezos_test.go +++ b/internal/blockchain/tezos/tezos_test.go @@ -548,7 +548,7 @@ func TestInitAllExistingStreams(t *testing.T) { assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.NoError(t, err) assert.Equal(t, 3, httpmock.GetTotalCallCount()) @@ -827,7 +827,7 @@ func TestAddSubscription(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -856,7 +856,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewJsonResponderOrPanic(200, &subscription{})) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.NoError(t, err) } @@ -877,7 +877,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) { Event: &core.FFISerializedEvent{}, } - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10310", err) } @@ -905,7 +905,7 @@ func TestAddSubscriptionFail(t *testing.T) { httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, httpmock.NewStringResponder(500, "pop")) - err := tz.AddContractListener(context.Background(), sub) + err := tz.AddContractListener(context.Background(), sub, "") assert.Regexp(t, "FF10283.*pop", err) } @@ -1549,7 +1549,7 @@ func TestAddSubBadLocation(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err := tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err := tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10310", err) } @@ -1597,7 +1597,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - subID, err := tz.AddFireflySubscription(tz.ctx, ns, contract) + subID, err := tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.NoError(t, err) assert.NotNil(t, tz.subs.GetSubscription("sub1")) @@ -1641,7 +1641,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10283", err) } @@ -1681,7 +1681,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) { } ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} - _, err = tz.AddFireflySubscription(tz.ctx, ns, contract) + _, err = tz.AddFireflySubscription(tz.ctx, ns, contract, "") assert.Regexp(t, "FF10283", err) } diff --git a/internal/multiparty/manager.go b/internal/multiparty/manager.go index 811e56141..5ac4ac35b 100644 --- a/internal/multiparty/manager.go +++ b/internal/multiparty/manager.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 //