Skip to content

Commit

Permalink
Get previous blockchain event before establishing listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Broadhurst <[email protected]>
  • Loading branch information
peterbroadhurst committed Jun 24, 2024
1 parent 16aec0b commit 9b553ff
Show file tree
Hide file tree
Showing 51 changed files with 189 additions and 76 deletions.
22 changes: 20 additions & 2 deletions internal/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,24 @@ func (cm *contractManager) checkContractListenerExists(ctx context.Context, list
log.L(ctx).Debugf("Validated listener %s:%s (BackendID=%s)", listener.Signature, listener.ID, listener.BackendID)
return nil
}
if err = cm.blockchain.AddContractListener(ctx, listener); err != nil {

// For the case that we're establishing a listener from "latest" we obtain the protocol ID
// of the latest event confirmed from the blockchain for a given subscription.
// This protocolID should be parsed and used by the blockchain plugin if SubOptsFirstEventNewest
// is passed through, and the listener does not exist.
fb := database.BlockchainEventQueryFactory.NewFilter(ctx).Sort("-protocolid").Limit(1)
latestEvents, _, err := cm.database.GetBlockchainEvents(ctx, cm.namespace, fb.Eq(
"listener", listener.ID,
))
if err != nil {
return err
}
lastProtocolID := ""
if len(latestEvents) > 0 {
lastProtocolID = latestEvents[0].ProtocolID
}

if err = cm.blockchain.AddContractListener(ctx, listener, lastProtocolID); err != nil {
return err
}
return cm.database.UpdateContractListener(ctx, cm.namespace, listener.ID,
Expand Down Expand Up @@ -886,7 +903,8 @@ func (cm *contractManager) AddContractListener(ctx context.Context, listener *co
if err := cm.validateFFIEvent(ctx, &listener.Event.FFIEventDefinition); err != nil {
return nil, err
}
if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener); err != nil {

if err = cm.blockchain.AddContractListener(ctx, &listener.ContractListener, ""); err != nil {
return nil, err
}
if listener.Name == "" {
Expand Down
56 changes: 46 additions & 10 deletions internal/contracts/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func TestAddContractListenerInline(t *testing.T) {
mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil)
mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil)

result, err := cm.AddContractListener(context.Background(), sub)
Expand Down Expand Up @@ -816,7 +816,7 @@ func TestAddContractListenerInlineNilLocation(t *testing.T) {
mbi.On("AddContractListener", context.Background(), mock.MatchedBy(func(cl *core.ContractListener) bool {
// Normalize is not called for this case
return cl.Location == nil
})).Return(nil)
}), "").Return(nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil)

result, err := cm.AddContractListener(context.Background(), sub)
Expand Down Expand Up @@ -853,7 +853,7 @@ func TestAddContractListenerNoLocationOK(t *testing.T) {

mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil)

result, err := cm.AddContractListener(context.Background(), sub)
Expand Down Expand Up @@ -902,7 +902,7 @@ func TestAddContractListenerByEventPath(t *testing.T) {
mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil)
mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil)
mdi.On("GetFFIByID", context.Background(), "ns1", interfaceID).Return(&fftypes.FFI{}, nil)
mdi.On("GetFFIEvent", context.Background(), "ns1", interfaceID, sub.EventPath).Return(event, nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(nil)
Expand Down Expand Up @@ -1071,6 +1071,13 @@ func TestAddContractListenerVerifyOk(t *testing.T) {
fi, _ := f.Finalize()
return fi.Skip == 50 && fi.Limit == 50
})).Return([]*core.ContractListener{}, nil, nil).Once()
mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool {
fi, err := f.Finalize()
assert.NoError(t, err)
return fi.Limit == 1 && strings.Contains(fi.String(), "listener")
})).Return([]*core.BlockchainEvent{
{Namespace: "ns1", ID: fftypes.NewUUID(), ProtocolID: "001/002/003"},
}, nil, nil).Once()

mbi := cm.blockchain.(*blockchainmocks.Plugin)
mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil)
Expand All @@ -1079,7 +1086,7 @@ func TestAddContractListenerVerifyOk(t *testing.T) {
prevBackendID := l.BackendID
l.BackendID = "34567"
return prevBackendID == "23456"
})).Return(nil)
}), "001/002/003").Return(nil)

mdi.On("UpdateContractListener", ctx, "ns1", mock.Anything, mock.MatchedBy(func(u ffapi.Update) bool {
uu, _ := u.Finalize()
Expand All @@ -1106,6 +1113,7 @@ func TestAddContractListenerVerifyUpdateFail(t *testing.T) {
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"},
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"},
}, nil, nil).Once()
mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return([]*core.BlockchainEvent{}, nil, nil).Once()

mbi := cm.blockchain.(*blockchainmocks.Plugin)
mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil)
Expand All @@ -1114,7 +1122,7 @@ func TestAddContractListenerVerifyUpdateFail(t *testing.T) {
prevBackendID := l.BackendID
l.BackendID = "34567"
return prevBackendID == "23456"
})).Return(nil)
}), "").Return(nil)

mdi.On("UpdateContractListener", ctx, "ns1", mock.Anything, mock.MatchedBy(func(u ffapi.Update) bool {
uu, _ := u.Finalize()
Expand All @@ -1141,6 +1149,7 @@ func TestAddContractListenerVerifyAddFail(t *testing.T) {
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"},
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"},
}, nil, nil).Once()
mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return([]*core.BlockchainEvent{}, nil, nil).Once()

mbi := cm.blockchain.(*blockchainmocks.Plugin)
mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil)
Expand All @@ -1149,7 +1158,34 @@ func TestAddContractListenerVerifyAddFail(t *testing.T) {
prevBackendID := l.BackendID
l.BackendID = "34567"
return prevBackendID == "23456"
})).Return(fmt.Errorf("pop"))
}), "").Return(fmt.Errorf("pop"))

err := cm.verifyListeners(ctx)
assert.Regexp(t, "pop", err)

mdi.AssertExpectations(t)
mbi.AssertExpectations(t)
}

func TestAddContractListenerGetEventsFail(t *testing.T) {
cm := newTestContractManager()

ctx := context.Background()

mdi := cm.database.(*databasemocks.Plugin)
mdi.On("GetContractListeners", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool {
fi, _ := f.Finalize()
return fi.Skip == 0 && fi.Limit == 50
})).Return([]*core.ContractListener{
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "12345"},
{Namespace: "ns1", ID: fftypes.NewUUID(), BackendID: "23456"},
}, nil, nil).Once()
mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).
Return(nil, nil, fmt.Errorf("pop")).Once()

mbi := cm.blockchain.(*blockchainmocks.Plugin)
mbi.On("GetContractListenerStatus", ctx, "ns1", "12345", true).Return(true, struct{}{}, core.ContractListenerStatusSynced, nil)
mbi.On("GetContractListenerStatus", ctx, "ns1", "23456", true).Return(false, nil, core.ContractListenerStatusUnknown, nil)

err := cm.verifyListeners(ctx)
assert.Regexp(t, "pop", err)
Expand Down Expand Up @@ -1386,7 +1422,7 @@ func TestAddContractListenerBlockchainFail(t *testing.T) {
mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil)
mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(fmt.Errorf("pop"))
mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(fmt.Errorf("pop"))

_, err := cm.AddContractListener(context.Background(), sub)
assert.EqualError(t, err, "pop")
Expand Down Expand Up @@ -1423,7 +1459,7 @@ func TestAddContractListenerUpsertSubFail(t *testing.T) {
mbi.On("NormalizeContractLocation", context.Background(), blockchain.NormalizeListener, sub.Location).Return(sub.Location, nil)
mbi.On("GenerateEventSignature", context.Background(), mock.Anything).Return("changed")
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener).Return(nil)
mbi.On("AddContractListener", context.Background(), &sub.ContractListener, "").Return(nil)
mdi.On("InsertContractListener", context.Background(), &sub.ContractListener).Return(fmt.Errorf("pop"))

_, err := cm.AddContractListener(context.Background(), sub)
Expand Down Expand Up @@ -1464,7 +1500,7 @@ func TestAddContractAPIListener(t *testing.T) {
mdi.On("GetContractListeners", context.Background(), "ns1", mock.Anything).Return(nil, nil, nil)
mbi.On("AddContractListener", context.Background(), mock.MatchedBy(func(l *core.ContractListener) bool {
return *l.Interface.ID == *interfaceID && l.Topic == "test-topic"
})).Return(nil)
}), "").Return(nil)
mdi.On("InsertContractListener", context.Background(), mock.MatchedBy(func(l *core.ContractListener) bool {
return *l.Interface.ID == *interfaceID && l.Event.Name == "changed" && l.Topic == "test-topic"
})).Return(nil)
Expand Down
18 changes: 17 additions & 1 deletion internal/multiparty/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,23 @@ func (mm *multipartyManager) configureContractCommon(ctx context.Context, migrat
}
}

subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace, current)
// For the case that we're establishing a listener from "latest" we obtain the protocol ID
// of the latest event confirmed from the blockchain for a given subscription.
// This protocolID should be parsed and used by the blockchain plugin if SubOptsFirstEventNewest
// is passed through, and the listener does not exist.
fb := database.BlockchainEventQueryFactory.NewFilter(ctx).Sort("-protocolid").Limit(1)
latestEvents, _, err := mm.database.GetBlockchainEvents(ctx, mm.namespace.Name, fb.Eq(
"listener", nil,
))
if err != nil {
return err
}
lastProtocolID := ""
if len(latestEvents) > 0 {
lastProtocolID = latestEvents[0].ProtocolID
}

subID, err := mm.blockchain.AddFireflySubscription(ctx, mm.namespace, current, lastProtocolID)
if err == nil {
active.Location = current.Location
active.FirstEvent = current.FirstEvent
Expand Down
45 changes: 44 additions & 1 deletion internal/multiparty/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package multiparty
import (
"context"
"fmt"
"strings"
"testing"

"github.com/hyperledger/firefly-common/pkg/ffapi"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly/internal/txcommon"
"github.com/hyperledger/firefly/mocks/blockchainmocks"
Expand Down Expand Up @@ -115,8 +117,15 @@ func TestConfigureContract(t *testing.T) {
defer mp.cleanup(t)

mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, "001/002/003").Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.MatchedBy(func(f ffapi.Filter) bool {
fi, err := f.Finalize()
assert.NoError(t, err)
return fi.Limit == 1 && strings.Contains(fi.String(), "listener")
})).Return([]*core.BlockchainEvent{
{Namespace: "ns1", ID: fftypes.NewUUID(), ProtocolID: "001/002/003"},
}, nil, nil).Once()

mp.multipartyManager.config.Contracts = []blockchain.MultipartyContract{{
FirstEvent: "0",
Expand All @@ -141,6 +150,7 @@ func TestConfigureContractLocationChanged(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()

mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{
Active: &core.MultipartyContract{
Expand Down Expand Up @@ -168,6 +178,7 @@ func TestConfigureContractDeprecatedConfig(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()

err := mp.ConfigureContract(context.Background())

Expand Down Expand Up @@ -263,6 +274,7 @@ func TestSubmitNetworkAction(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()
mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(txid, nil)
mp.mbi.On("Name").Return("ut")
mp.mom.On("AddOrReuseOperation", context.Background(), mock.MatchedBy(func(op *core.Operation) bool {
Expand Down Expand Up @@ -306,6 +318,7 @@ func TestSubmitNetworkActionTXFail(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()
mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(nil, fmt.Errorf("pop"))

err := mp.ConfigureContract(context.Background())
Expand All @@ -317,6 +330,32 @@ func TestSubmitNetworkActionTXFail(t *testing.T) {
mp.mth.AssertExpectations(t)
}

func TestConfigureContractLookupBlockchainEventFail(t *testing.T) {
location := fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
}.String())

mp := newTestMultipartyManager()
defer mp.cleanup(t)

mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{
Active: &core.MultipartyContract{Index: 0},
}
mp.multipartyManager.config.Contracts = []blockchain.MultipartyContract{{
FirstEvent: "0",
Location: location,
}}

mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, fmt.Errorf("pop")).Once()

err := mp.ConfigureContract(context.Background())
assert.EqualError(t, err, "pop")

mp.mbi.AssertExpectations(t)
mp.mth.AssertExpectations(t)
}

func TestSubmitNetworkActionOpFail(t *testing.T) {
location := fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
Expand All @@ -337,6 +376,7 @@ func TestSubmitNetworkActionOpFail(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()
mp.mth.On("SubmitNewTransaction", mock.Anything, core.TransactionTypeNetworkAction, core.IdempotencyKey("")).Return(txid, nil)
mp.mbi.On("Name").Return("ut")
mp.mom.On("AddOrReuseOperation", context.Background(), mock.Anything).Return(fmt.Errorf("pop"))
Expand All @@ -362,6 +402,7 @@ func TestSubmitNetworkActionBadType(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()

mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{
Active: &core.MultipartyContract{Index: 0},
Expand Down Expand Up @@ -623,6 +664,7 @@ func TestGetNetworkVersion(t *testing.T) {
mp.mbi.On("GetNetworkVersion", mock.Anything, mock.Anything).Return(1, nil)
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil).Once()

mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{
Active: &core.MultipartyContract{Index: 0},
Expand Down Expand Up @@ -651,6 +693,7 @@ func TestConfgureAndTerminateContract(t *testing.T) {
mp.mbi.On("AddFireflySubscription", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return("test", nil)
mp.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
mp.mbi.On("RemoveFireflySubscription", mock.Anything, mock.Anything).Return(nil)
mp.mdi.On("GetBlockchainEvents", mock.Anything, "ns1", mock.Anything).Return(nil, nil, nil)

mp.multipartyManager.namespace.Contracts = &core.MultipartyContracts{
Active: &core.MultipartyContract{Index: 0},
Expand Down
2 changes: 1 addition & 1 deletion mocks/apiservermocks/ffi_swagger_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/apiservermocks/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/assetmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/batchmocks/manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/blockchaincommonmocks/firefly_subscriptions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/blockchainmocks/callbacks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9b553ff

Please sign in to comment.