Skip to content

Commit

Permalink
Update all blockchain connectors to handle lastProtocolID
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jun 24, 2024
1 parent 9b553ff commit 752c510
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 65 deletions.
8 changes: 4 additions & 4 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (e *Ethereum) Capabilities() *blockchain.Capabilities {
return e.capabilities
}

func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) {
func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) {
ethLocation, err := e.parseContractLocation(ctx, contract.Location)
if err != nil {
return "", err
Expand All @@ -286,7 +286,7 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI)
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI, lastProtocolID)

if err != nil {
return "", err
Expand Down Expand Up @@ -874,7 +874,7 @@ func (e *Ethereum) encodeContractLocation(ctx context.Context, location *Locatio
return result, err
}

func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener) (err error) {
func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
var location *Location
namespace := listener.Namespace
if listener.Location != nil {
Expand All @@ -893,7 +893,7 @@ func (e *Ethereum) AddContractListener(ctx context.Context, listener *core.Contr
if listener.Options != nil {
firstEvent = listener.Options.FirstEvent
}
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi)
result, err := e.streams.createSubscription(ctx, location, e.streamID[namespace], subName, firstEvent, abi, lastProtocolID)
if err != nil {
return err
}
Expand Down
44 changes: 25 additions & 19 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,7 @@ func TestInitAllExistingStreams(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -964,7 +964,7 @@ func TestInitAllExistingStreamsV1(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1022,7 +1022,7 @@ func TestInitAllExistingStreamsOld(t *testing.T) {
<-toServer

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)

assert.Equal(t, 4, httpmock.GetTotalCallCount())
Expand Down Expand Up @@ -1080,7 +1080,7 @@ func TestInitAllExistingStreamsInvalidName(t *testing.T) {

<-toServer
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10416", err)
}

Expand Down Expand Up @@ -2027,7 +2027,7 @@ func TestAddSubscription(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2062,7 +2062,7 @@ func TestAddSubscriptionWithoutLocation(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.NoError(t, err)
}
Expand Down Expand Up @@ -2097,7 +2097,7 @@ func TestAddSubscriptionBadParamDetails(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewJsonResponderOrPanic(200, &subscription{}))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10311", err)
}
Expand All @@ -2118,7 +2118,7 @@ func TestAddSubscriptionBadLocation(t *testing.T) {
Event: &core.FFISerializedEvent{},
}

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10310", err)
}
Expand Down Expand Up @@ -2147,7 +2147,7 @@ func TestAddSubscriptionFail(t *testing.T) {
httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
httpmock.NewStringResponder(500, "pop"))

err := e.AddContractListener(context.Background(), sub)
err := e.AddContractListener(context.Background(), sub, "")

assert.Regexp(t, "FF10111", err)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -4005,7 +4005,7 @@ func TestAddSubBadLocation(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err := e.AddFireflySubscription(e.ctx, ns, contract)
_, err := e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10310", err)
}

Expand All @@ -4025,9 +4025,15 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
}))
func(r *http.Request) (*http.Response, error) {
var s subscription
err := json.NewDecoder(r.Body).Decode(&s)
assert.NoError(t, err)
assert.Equal(t, "19", s.FromBlock)
return httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
})(r)
})
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(2))

utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
Expand Down Expand Up @@ -4055,7 +4061,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
subID, err := e.AddFireflySubscription(e.ctx, ns, contract)
subID, err := e.AddFireflySubscription(e.ctx, ns, contract, "000000000020/000000/000000")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))

Expand Down Expand Up @@ -4103,7 +4109,7 @@ func TestAddFireflySubscriptionV1(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))
}
Expand Down Expand Up @@ -4147,7 +4153,7 @@ func TestAddFireflySubscriptionEventstreamFail(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10465", err)
}

Expand Down Expand Up @@ -4189,7 +4195,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4231,7 +4237,7 @@ func TestAddFireflySubscriptionCreateError(t *testing.T) {
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down Expand Up @@ -4273,7 +4279,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) {

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
_, err = e.AddFireflySubscription(e.ctx, ns, contract, "")
assert.Regexp(t, "FF10111", err)
}

Expand Down
25 changes: 20 additions & 5 deletions internal/blockchain/ethereum/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -21,6 +21,8 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"
"strings"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand Down Expand Up @@ -201,13 +203,26 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry) (*subscription, error) {
func latestOrLastBlock(protocolID string) string {
if len(protocolID) > 0 {
blockStr := strings.Split(protocolID, "/")[0]
blockNumber, err := strconv.ParseUint(blockStr, 10, 64)
if err == nil {
// We jump back on block from the last event, to minimize re-delivery while ensuring
// we get all events since the last delivered (including subsequent events in the same block)
return strconv.FormatUint(blockNumber-1, 10)
}
}
return "latest"
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, subName, firstEvent string, abi *abi.Entry, lastProtocolID string) (*subscription, error) {
// Map FireFly "firstEvent" values to Ethereum "fromBlock" values
switch firstEvent {
case string(core.SubOptsFirstEventOldest):
firstEvent = "0"
case string(core.SubOptsFirstEventNewest):
firstEvent = "latest"
firstEvent = latestOrLastBlock(lastProtocolID)
}
sub := subscription{
Name: subName,
Expand Down Expand Up @@ -244,7 +259,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok
return nil
}

func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry) (sub *subscription, err error) {
func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, instancePath, firstEvent, stream string, abi *abi.Entry, lastProtocolID string) (sub *subscription, err error) {
// Include a hash of the instance path in the subscription, so if we ever point at a different
// contract configuration, we re-subscribe from block 0.
// We don't need full strength hashing, so just use the first 16 chars for readability.
Expand Down Expand Up @@ -286,7 +301,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
name = v1Name
}
location := &Location{Address: instancePath}
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, name, firstEvent, abi, lastProtocolID); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", abi.Name, sub.ID)
Expand Down
25 changes: 21 additions & 4 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -19,6 +19,8 @@ package fabric
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
Expand Down Expand Up @@ -177,10 +179,25 @@ func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (
return sub.Name, nil
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent string) (*subscription, error) {
func newestOrLastBlock(protocolID string) string {
if len(protocolID) > 0 {
blockStr := strings.Split(protocolID, "/")[0]
blockNumber, err := strconv.ParseUint(blockStr, 10, 64)
if err == nil {
// We jump back on block from the last event, to minimize re-delivery while ensuring
// we get all events since the last delivered (including subsequent events in the same block)
return strconv.FormatUint(blockNumber-1, 10)
}
}
return "newest"
}

func (s *streamManager) createSubscription(ctx context.Context, location *Location, stream, name, event, firstEvent, lastProtocolID string) (*subscription, error) {
// Map FireFly "firstEvent" values to Fabric "fromBlock" values
if firstEvent == string(core.SubOptsFirstEventOldest) {
firstEvent = "0"
} else if firstEvent == "" || firstEvent == string(core.SubOptsFirstEventNewest) {
firstEvent = newestOrLastBlock(lastProtocolID)
}
sub := subscription{
Name: name,
Expand Down Expand Up @@ -221,7 +238,7 @@ func (s *streamManager) deleteSubscription(ctx context.Context, subID string, ok
return nil
}

func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event string) (sub *subscription, err error) {
func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace string, version int, location *Location, firstEvent, stream, event, lastProtocolID string) (sub *subscription, err error) {
existingSubs, err := s.getSubscriptions(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -250,7 +267,7 @@ func (s *streamManager) ensureFireFlySubscription(ctx context.Context, namespace
if version == 1 {
name = v1Name
}
if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent); err != nil {
if sub, err = s.createSubscription(ctx, location, stream, name, event, firstEvent, lastProtocolID); err != nil {
return nil, err
}
log.L(ctx).Infof("%s subscription: %s", event, sub.ID)
Expand Down
8 changes: 4 additions & 4 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsT
return nil
}

func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract) (string, error) {
func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Namespace, contract *blockchain.MultipartyContract, lastProtocolID string) (string, error) {
fabricOnChainLocation, err := parseContractLocation(ctx, contract.Location)
if err != nil {
return "", err
Expand All @@ -437,7 +437,7 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Nam
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent)
sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent, lastProtocolID)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -925,15 +925,15 @@ func encodeContractLocation(ctx context.Context, ntype blockchain.NormalizeType,
return result, err
}

func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener) error {
func (f *Fabric) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) error {
namespace := listener.Namespace
location, err := parseContractLocation(ctx, listener.Location)
if err != nil {
return err
}

subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent)
result, err := f.streams.createSubscription(ctx, location, f.streamID[namespace], subName, listener.Event.Name, listener.Options.FirstEvent, lastProtocolID)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 752c510

Please sign in to comment.