Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Nicko Guyer <[email protected]>
  • Loading branch information
nguyer committed Nov 15, 2023
1 parent aa64620 commit e0b34a5
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 60 deletions.
14 changes: 9 additions & 5 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (e *Ethereum) StartNamespace(ctx context.Context, namespace string) (err er

e.closed[namespace] = make(chan struct{})

go e.eventLoop(namespace)
go e.eventLoop(namespace, e.wsconn[namespace], e.closed[namespace])

return nil
}
Expand Down Expand Up @@ -282,7 +282,12 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N
return "", err
}

sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID[namespace.Name], batchPinEventABI)
streamID, ok := e.streamID[namespace.Name]
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI)

if err != nil {
return "", err
}
Expand Down Expand Up @@ -447,11 +452,10 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, batchID int64, messag
return e.callbacks.DispatchBlockchainEvents(ctx, events)
}

func (e *Ethereum) eventLoop(namespace string) {
func (e *Ethereum) eventLoop(namespace string, wsconn wsclient.WSClient, closed chan struct{}) {
topic := e.getTopic(namespace)
wsconn := e.wsconn[namespace]
defer wsconn.Close()
defer close(e.closed[namespace])
defer close(closed)
l := log.L(e.ctx).WithField("role", "event-loop")
ctx := log.WithLogger(e.ctx, l)
log.L(ctx).Debugf("Starting event loop for namespace '%s'", namespace)
Expand Down
55 changes: 51 additions & 4 deletions internal/blockchain/ethereum/ethereum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ func TestEventLoopContextCancelled(t *testing.T) {
wsm.On("Receive").Return(r)
wsm.On("Close").Return()
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
wsm.AssertExpectations(t)
}

Expand All @@ -1760,7 +1760,7 @@ func TestEventLoopReceiveClosed(t *testing.T) {
wsm.On("Receive").Return((<-chan []byte)(r))
wsm.On("Close").Return()
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
wsm.AssertExpectations(t)
}

Expand All @@ -1778,7 +1778,7 @@ func TestEventLoopSendClosed(t *testing.T) {
}).Return(fmt.Errorf("pop"))
wsm.On("Close").Return()
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
wsm.AssertExpectations(t)
}

Expand Down Expand Up @@ -1959,7 +1959,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
close(done)
}

go e.eventLoop("ns1")
go e.eventLoop("ns1", wsm, e.closed["ns1"])
r <- []byte(`!badjson`) // ignored bad json
r <- []byte(`"not an object"`) // ignored wrong type
r <- data.Bytes()
Expand Down Expand Up @@ -3984,6 +3984,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) {
FirstEvent: "newest",
}

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
subID, err := e.AddFireflySubscription(e.ctx, ns, contract)
assert.NoError(t, err)
Expand Down Expand Up @@ -4031,12 +4032,56 @@ func TestAddFireflySubscriptionV1(t *testing.T) {
FirstEvent: "newest",
}

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.NoError(t, err)
assert.NotNil(t, e.subs.GetSubscription("sub1"))
}

func TestAddFireflySubscriptionEventstreamFail(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
resetConf(e)

mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sub1",
}))
httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1))

utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
utEthconnectConf.Set(EthconnectConfigTopic, "topic1")

cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil)
err := e.Init(e.ctx, e.cancelCtx, utConfig, e.metrics, cmi)
// assert.NoError(t, err)

location := fftypes.JSONAnyPtr(fftypes.JSONObject{
"address": "0x123",
}.String())
contract := &blockchain.MultipartyContract{
Location: location,
FirstEvent: "newest",
}

ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "FF10462", err)
}

func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {
e, cancel := newTestEthereum()
defer cancel()
Expand Down Expand Up @@ -4073,6 +4118,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {
FirstEvent: "oldest",
}

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "FF10111", err)
Expand Down Expand Up @@ -4156,6 +4202,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) {
FirstEvent: "oldest",
}

e.streamID["ns1"] = "es12345"
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "FF10111", err)
Expand Down
14 changes: 9 additions & 5 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (f *Fabric) StartNamespace(ctx context.Context, namespace string) (err erro

f.closed[namespace] = make(chan struct{})

go f.eventLoop(namespace)
go f.eventLoop(namespace, f.wsconn[namespace], f.closed[namespace])

return nil
}
Expand Down Expand Up @@ -433,7 +433,11 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Nam
fabricOnChainLocation.Chaincode = ""
}

sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, f.streamID[namespace.Name], batchPinEvent)
streamID, ok := f.streamID[namespace.Name]
if !ok {
return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found")
}
sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -496,13 +500,13 @@ func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{})
return f.callbacks.DispatchBlockchainEvents(ctx, events)
}

func (f *Fabric) eventLoop(namespace string) {
func (f *Fabric) eventLoop(namespace string, wsconn wsclient.WSClient, closed chan struct{}) {
topic := f.getTopic(namespace)
wsconn := f.wsconn[namespace]
defer wsconn.Close()
defer close(f.closed[namespace])
defer close(closed)
l := log.L(f.ctx).WithField("role", "event-loop")
ctx := log.WithLogger(f.ctx, l)
log.L(ctx).Debugf("Starting event loop for namespace '%s'", namespace)
for {
select {
case <-ctx.Done():
Expand Down
63 changes: 59 additions & 4 deletions internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,61 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) {
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
e.streamID["ns1"] = "es12345"
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.NoError(t, err)
}

func TestAddFireflySubscriptionEventstreamFail(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
resetConf(e)

httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams",
httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}}))
httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions",
httpmock.NewJsonResponderOrPanic(200, []subscription{}))
httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"),
mockNetworkVersion(t, 1))

httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`,
func(req *http.Request) (*http.Response, error) {
var body map[string]interface{}
json.NewDecoder(req.Body).Decode(&body)
assert.Equal(t, "firefly", body["channel"])
assert.Equal(t, nil, body["chaincode"])
return httpmock.NewJsonResponderOrPanic(200, body)(req)
})

mockedClient := &http.Client{}
httpmock.ActivateNonDefault(mockedClient)
defer httpmock.DeactivateAndReset()

utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345")
utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient)
utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly")
utFabconnectConf.Set(FabconnectConfigSigner, "signer001")
utFabconnectConf.Set(FabconnectConfigTopic, "topic1")

location := fftypes.JSONAnyPtr(fftypes.JSONObject{
"channel": "firefly",
"chaincode": "simplestorage",
}.String())
contract := &blockchain.MultipartyContract{
Location: location,
FirstEvent: "newest",
Options: fftypes.JSONAnyPtr(`{"customPinSupport":true}`),
}

cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil)
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "FF10462", err)
}

func TestAddFireflySubscriptionBadOptions(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
Expand Down Expand Up @@ -594,6 +645,7 @@ func TestAddFireflySubscriptionBadOptions(t *testing.T) {
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
e.streamID["ns1"] = "es12345"
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "pop", err)
}
Expand Down Expand Up @@ -634,6 +686,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) {
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
e.streamID["ns1"] = "es12345"
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "pop", err)
}
Expand Down Expand Up @@ -1037,6 +1090,7 @@ func TestSubQueryCreateError(t *testing.T) {
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
e.streamID["ns1"] = "es12345"
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.Regexp(t, "FF10284.*pop", err)

Expand Down Expand Up @@ -1084,6 +1138,7 @@ func TestSubQueryCreate(t *testing.T) {
err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi)
assert.NoError(t, err)
ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"}
e.streamID["ns1"] = "es12345"
_, err = e.AddFireflySubscription(e.ctx, ns, contract)
assert.NoError(t, err)

Expand Down Expand Up @@ -1634,7 +1689,7 @@ func TestEventLoopContextCancelled(t *testing.T) {
wsm.On("Receive").Return(r)
wsm.On("Close").Return()
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
}

func TestEventLoopReceiveClosed(t *testing.T) {
Expand All @@ -1647,7 +1702,7 @@ func TestEventLoopReceiveClosed(t *testing.T) {
wsm.On("Receive").Return((<-chan []byte)(r))
wsm.On("Close").Return()
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
}

func TestEventLoopSendClosed(t *testing.T) {
Expand All @@ -1664,7 +1719,7 @@ func TestEventLoopSendClosed(t *testing.T) {
close(r)
})
e.closed["ns1"] = make(chan struct{})
e.eventLoop("ns1") // we're simply looking for it exiting
e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting
wsm.AssertExpectations(t)
}

Expand Down Expand Up @@ -1706,7 +1761,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) {
close(done)
}

go e.eventLoop("ns1")
go e.eventLoop("ns1", wsm, e.closed["ns1"])
r <- []byte(`!badjson`) // ignored bad json
r <- []byte(`"not an object"`) // ignored wrong type
r <- data
Expand Down
1 change: 1 addition & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,5 @@ var (
MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409)
MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400)
MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400)
MsgInternalServerError = ffe("FF10462", "Internal server error: %s", 500)
)
2 changes: 0 additions & 2 deletions internal/namespace/configreload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,6 @@ func mockPurge(nmm *nmMocks, nsName string) {
nmm.mdi.On("SetHandler", nsName, matchNil).Return()
nmm.mbi.On("SetHandler", nsName, matchNil).Return()
nmm.mbi.On("SetOperationHandler", nsName, matchNil).Return()
nmm.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(nil)
nmm.mps.On("SetHandler", nsName, matchNil).Return().Maybe()
nmm.mps.On("SetOperationHandler", nsName, matchNil).Return().Maybe()
nmm.mdx.On("SetHandler", nsName, mock.Anything, matchNil).Return().Maybe()
Expand All @@ -817,7 +816,6 @@ func mockPurge(nmm *nmMocks, nsName string) {
mti.On("SetHandler", nsName, matchNil).Return().Maybe()
mti.On("SetOperationHandler", nsName, matchNil).Return().Maybe()
mti.On("StartNamespace", mock.Anything, nsName).Return(nil).Maybe()
mti.On("StopNamespace", mock.Anything, nsName).Return(nil).Maybe()
}
}

Expand Down
23 changes: 0 additions & 23 deletions internal/namespace/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,8 +1852,6 @@ func TestStart(t *testing.T) {
waitInit := namespaceInitWaiter(t, nmm, []string{"default"})

nmm.mdx.On("Start", mock.Anything).Return(nil)
// nmm.mti[0].On("StartNamespace", mock.Anything, "default").Return(nil)
// nmm.mti[1].On("StartNamespace", mock.Anything, "default").Return(nil)
nmm.mdi.On("GetNamespace", mock.Anything, "default").Return(nil, nil)
nmm.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
nmm.mo.On("PreInit", mock.Anything, mock.Anything).Return(nil)
Expand Down Expand Up @@ -1882,27 +1880,6 @@ func TestStartDataExchangeFail(t *testing.T) {

}

// func TestStartTokensFail(t *testing.T) {
// nm, nmm, cleanup := newTestNamespaceManager(t, true)
// defer cleanup()

// // nm.namespaces = nil
// nmm.mdi.On("GetNamespace", mock.Anything, "default").Return(nil, nil)
// nmm.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil)
// nmm.mo.On("PreInit", mock.Anything, mock.Anything).Return()
// nmm.mo.On("Init").Return(nil)
// nmm.mo.On("Start").Return(nil)
// nmm.mti[0].On("StartNamespace", mock.Anything, "default").Return(fmt.Errorf("pop"))

// // nmm.mti[1].On("StartNamespace", mock.Anything, "default").Return(fmt.Errorf("pop"))

// err := nm.startNamespacesAndPlugins(nm.namespaces, map[string]*plugin{
// "erc721": nm.plugins["erc721"],
// })
// assert.EqualError(t, err, "pop")

// }

func TestStartOrchestratorFail(t *testing.T) {
nm, nmm, cleanup := newTestNamespaceManager(t, true)
defer cleanup()
Expand Down
Loading

0 comments on commit e0b34a5

Please sign in to comment.