From e0b34a56833606a23f4a0c4cb375e2683d4b84bc Mon Sep 17 00:00:00 2001 From: Nicko Guyer Date: Wed, 15 Nov 2023 14:36:56 -0500 Subject: [PATCH] Address PR feedback Signed-off-by: Nicko Guyer --- internal/blockchain/ethereum/ethereum.go | 14 +++-- internal/blockchain/ethereum/ethereum_test.go | 55 ++++++++++++++-- internal/blockchain/fabric/fabric.go | 14 +++-- internal/blockchain/fabric/fabric_test.go | 63 +++++++++++++++++-- internal/coremsgs/en_error_messages.go | 1 + internal/namespace/configreload_test.go | 2 - internal/namespace/manager_test.go | 23 ------- internal/orchestrator/orchestrator.go | 20 +++--- internal/orchestrator/orchestrator_test.go | 33 +++++++--- 9 files changed, 165 insertions(+), 60 deletions(-) diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index fbcfd8b4a..0c910697c 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -242,7 +242,7 @@ func (e *Ethereum) StartNamespace(ctx context.Context, namespace string) (err er e.closed[namespace] = make(chan struct{}) - go e.eventLoop(namespace) + go e.eventLoop(namespace, e.wsconn[namespace], e.closed[namespace]) return nil } @@ -282,7 +282,12 @@ func (e *Ethereum) AddFireflySubscription(ctx context.Context, namespace *core.N return "", err } - sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, e.streamID[namespace.Name], batchPinEventABI) + streamID, ok := e.streamID[namespace.Name] + if !ok { + return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") + } + sub, err := e.streams.ensureFireFlySubscription(ctx, namespace.Name, version, ethLocation.Address, contract.FirstEvent, streamID, batchPinEventABI) + if err != nil { return "", err } @@ -447,11 +452,10 @@ func (e *Ethereum) handleMessageBatch(ctx context.Context, batchID int64, messag return e.callbacks.DispatchBlockchainEvents(ctx, events) } -func (e *Ethereum) eventLoop(namespace string) { +func (e *Ethereum) eventLoop(namespace string, wsconn wsclient.WSClient, closed chan struct{}) { topic := e.getTopic(namespace) - wsconn := e.wsconn[namespace] defer wsconn.Close() - defer close(e.closed[namespace]) + defer close(closed) l := log.L(e.ctx).WithField("role", "event-loop") ctx := log.WithLogger(e.ctx, l) log.L(ctx).Debugf("Starting event loop for namespace '%s'", namespace) diff --git a/internal/blockchain/ethereum/ethereum_test.go b/internal/blockchain/ethereum/ethereum_test.go index c90591a0c..3ce3ba09d 100644 --- a/internal/blockchain/ethereum/ethereum_test.go +++ b/internal/blockchain/ethereum/ethereum_test.go @@ -1746,7 +1746,7 @@ func TestEventLoopContextCancelled(t *testing.T) { wsm.On("Receive").Return(r) wsm.On("Close").Return() e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting wsm.AssertExpectations(t) } @@ -1760,7 +1760,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm.On("Receive").Return((<-chan []byte)(r)) wsm.On("Close").Return() e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting wsm.AssertExpectations(t) } @@ -1778,7 +1778,7 @@ func TestEventLoopSendClosed(t *testing.T) { }).Return(fmt.Errorf("pop")) wsm.On("Close").Return() e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting wsm.AssertExpectations(t) } @@ -1959,7 +1959,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) { close(done) } - go e.eventLoop("ns1") + go e.eventLoop("ns1", wsm, e.closed["ns1"]) r <- []byte(`!badjson`) // ignored bad json r <- []byte(`"not an object"`) // ignored wrong type r <- data.Bytes() @@ -3984,6 +3984,7 @@ func TestAddAndRemoveFireflySubscription(t *testing.T) { FirstEvent: "newest", } + e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} subID, err := e.AddFireflySubscription(e.ctx, ns, contract) assert.NoError(t, err) @@ -4031,12 +4032,56 @@ func TestAddFireflySubscriptionV1(t *testing.T) { FirstEvent: "newest", } + e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.NoError(t, err) assert.NotNil(t, e.subs.GetSubscription("sub1")) } +func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { + e, cancel := newTestEthereum() + defer cancel() + resetConf(e) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, eventStream{ID: "es12345"})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sub1", + })) + httpmock.RegisterResponder("POST", "http://localhost:12345/", mockNetworkVersion(t, 1)) + + utEthconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utEthconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utEthconnectConf.Set(EthconnectConfigTopic, "topic1") + + cmi := &cachemocks.Manager{} + cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil) + err := e.Init(e.ctx, e.cancelCtx, utConfig, e.metrics, cmi) + // assert.NoError(t, err) + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "address": "0x123", + }.String()) + contract := &blockchain.MultipartyContract{ + Location: location, + FirstEvent: "newest", + } + + ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + _, err = e.AddFireflySubscription(e.ctx, ns, contract) + assert.Regexp(t, "FF10462", err) +} + func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { e, cancel := newTestEthereum() defer cancel() @@ -4073,6 +4118,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { FirstEvent: "oldest", } + e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.Regexp(t, "FF10111", err) @@ -4156,6 +4202,7 @@ func TestAddFireflySubscriptionGetVersionError(t *testing.T) { FirstEvent: "oldest", } + e.streamID["ns1"] = "es12345" ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.Regexp(t, "FF10111", err) diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index bab973035..0a831ced9 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -288,7 +288,7 @@ func (f *Fabric) StartNamespace(ctx context.Context, namespace string) (err erro f.closed[namespace] = make(chan struct{}) - go f.eventLoop(namespace) + go f.eventLoop(namespace, f.wsconn[namespace], f.closed[namespace]) return nil } @@ -433,7 +433,11 @@ func (f *Fabric) AddFireflySubscription(ctx context.Context, namespace *core.Nam fabricOnChainLocation.Chaincode = "" } - sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, f.streamID[namespace.Name], batchPinEvent) + streamID, ok := f.streamID[namespace.Name] + if !ok { + return "", i18n.NewError(ctx, coremsgs.MsgInternalServerError, "eventstream ID not found") + } + sub, err := f.streams.ensureFireFlySubscription(ctx, namespace.Name, version, fabricOnChainLocation, contract.FirstEvent, streamID, batchPinEvent) if err != nil { return "", err } @@ -496,13 +500,13 @@ func (f *Fabric) handleMessageBatch(ctx context.Context, messages []interface{}) return f.callbacks.DispatchBlockchainEvents(ctx, events) } -func (f *Fabric) eventLoop(namespace string) { +func (f *Fabric) eventLoop(namespace string, wsconn wsclient.WSClient, closed chan struct{}) { topic := f.getTopic(namespace) - wsconn := f.wsconn[namespace] defer wsconn.Close() - defer close(f.closed[namespace]) + defer close(closed) l := log.L(f.ctx).WithField("role", "event-loop") ctx := log.WithLogger(f.ctx, l) + log.L(ctx).Debugf("Starting event loop for namespace '%s'", namespace) for { select { case <-ctx.Done(): diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 9b6f82038..11493e64c 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -553,10 +553,61 @@ func TestAddFireflySubscriptionGlobal(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + e.streamID["ns1"] = "es12345" _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.NoError(t, err) } +func TestAddFireflySubscriptionEventstreamFail(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + resetConf(e) + + httpmock.RegisterResponder("GET", "http://localhost:12345/eventstreams", + httpmock.NewJsonResponderOrPanic(200, []eventStream{{ID: "es12345", Name: "topic1"}})) + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions", + httpmock.NewJsonResponderOrPanic(200, []subscription{})) + httpmock.RegisterResponder("POST", fmt.Sprintf("http://localhost:12345/query"), + mockNetworkVersion(t, 1)) + + httpmock.RegisterResponder("POST", `http://localhost:12345/subscriptions`, + func(req *http.Request) (*http.Response, error) { + var body map[string]interface{} + json.NewDecoder(req.Body).Decode(&body) + assert.Equal(t, "firefly", body["channel"]) + assert.Equal(t, nil, body["chaincode"]) + return httpmock.NewJsonResponderOrPanic(200, body)(req) + }) + + mockedClient := &http.Client{} + httpmock.ActivateNonDefault(mockedClient) + defer httpmock.DeactivateAndReset() + + utFabconnectConf.Set(ffresty.HTTPConfigURL, "http://localhost:12345") + utFabconnectConf.Set(ffresty.HTTPCustomClient, mockedClient) + utFabconnectConf.Set(FabconnectConfigChaincodeDeprecated, "firefly") + utFabconnectConf.Set(FabconnectConfigSigner, "signer001") + utFabconnectConf.Set(FabconnectConfigTopic, "topic1") + + location := fftypes.JSONAnyPtr(fftypes.JSONObject{ + "channel": "firefly", + "chaincode": "simplestorage", + }.String()) + contract := &blockchain.MultipartyContract{ + Location: location, + FirstEvent: "newest", + Options: fftypes.JSONAnyPtr(`{"customPinSupport":true}`), + } + + cmi := &cachemocks.Manager{} + cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(e.ctx, 100, 5*time.Minute), nil) + err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) + assert.NoError(t, err) + ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + _, err = e.AddFireflySubscription(e.ctx, ns, contract) + assert.Regexp(t, "FF10462", err) +} + func TestAddFireflySubscriptionBadOptions(t *testing.T) { e, cancel := newTestFabric() defer cancel() @@ -594,6 +645,7 @@ func TestAddFireflySubscriptionBadOptions(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + e.streamID["ns1"] = "es12345" _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.Regexp(t, "pop", err) } @@ -634,6 +686,7 @@ func TestAddFireflySubscriptionQuerySubsFail(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + e.streamID["ns1"] = "es12345" _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.Regexp(t, "pop", err) } @@ -1037,6 +1090,7 @@ func TestSubQueryCreateError(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + e.streamID["ns1"] = "es12345" _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.Regexp(t, "FF10284.*pop", err) @@ -1084,6 +1138,7 @@ func TestSubQueryCreate(t *testing.T) { err := e.Init(e.ctx, e.cancelCtx, utConfig, &metricsmocks.Manager{}, cmi) assert.NoError(t, err) ns := &core.Namespace{Name: "ns1", NetworkName: "ns1"} + e.streamID["ns1"] = "es12345" _, err = e.AddFireflySubscription(e.ctx, ns, contract) assert.NoError(t, err) @@ -1634,7 +1689,7 @@ func TestEventLoopContextCancelled(t *testing.T) { wsm.On("Receive").Return(r) wsm.On("Close").Return() e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting } func TestEventLoopReceiveClosed(t *testing.T) { @@ -1647,7 +1702,7 @@ func TestEventLoopReceiveClosed(t *testing.T) { wsm.On("Receive").Return((<-chan []byte)(r)) wsm.On("Close").Return() e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting } func TestEventLoopSendClosed(t *testing.T) { @@ -1664,7 +1719,7 @@ func TestEventLoopSendClosed(t *testing.T) { close(r) }) e.closed["ns1"] = make(chan struct{}) - e.eventLoop("ns1") // we're simply looking for it exiting + e.eventLoop("ns1", wsm, e.closed["ns1"]) // we're simply looking for it exiting wsm.AssertExpectations(t) } @@ -1706,7 +1761,7 @@ func TestEventLoopUnexpectedMessage(t *testing.T) { close(done) } - go e.eventLoop("ns1") + go e.eventLoop("ns1", wsm, e.closed["ns1"]) r <- []byte(`!badjson`) // ignored bad json r <- []byte(`"not an object"`) // ignored wrong type r <- data diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index c7a87ff0d..9e4148a24 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -300,4 +300,5 @@ var ( MsgTokensRESTErrConflict = ffe("FF10459", "Conflict from tokens service: %s", 409) MsgBatchWithDataNotSupported = ffe("FF10460", "Provided subscription '%s' enables batching and withData which is not supported", 400) MsgBatchDeliveryNotSupported = ffe("FF10461", "Batch delivery not supported by transport '%s'", 400) + MsgInternalServerError = ffe("FF10462", "Internal server error: %s", 500) ) diff --git a/internal/namespace/configreload_test.go b/internal/namespace/configreload_test.go index 5719dceba..9afd00b91 100644 --- a/internal/namespace/configreload_test.go +++ b/internal/namespace/configreload_test.go @@ -808,7 +808,6 @@ func mockPurge(nmm *nmMocks, nsName string) { nmm.mdi.On("SetHandler", nsName, matchNil).Return() nmm.mbi.On("SetHandler", nsName, matchNil).Return() nmm.mbi.On("SetOperationHandler", nsName, matchNil).Return() - nmm.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(nil) nmm.mps.On("SetHandler", nsName, matchNil).Return().Maybe() nmm.mps.On("SetOperationHandler", nsName, matchNil).Return().Maybe() nmm.mdx.On("SetHandler", nsName, mock.Anything, matchNil).Return().Maybe() @@ -817,7 +816,6 @@ func mockPurge(nmm *nmMocks, nsName string) { mti.On("SetHandler", nsName, matchNil).Return().Maybe() mti.On("SetOperationHandler", nsName, matchNil).Return().Maybe() mti.On("StartNamespace", mock.Anything, nsName).Return(nil).Maybe() - mti.On("StopNamespace", mock.Anything, nsName).Return(nil).Maybe() } } diff --git a/internal/namespace/manager_test.go b/internal/namespace/manager_test.go index 1338afa35..9dce67d2b 100644 --- a/internal/namespace/manager_test.go +++ b/internal/namespace/manager_test.go @@ -1852,8 +1852,6 @@ func TestStart(t *testing.T) { waitInit := namespaceInitWaiter(t, nmm, []string{"default"}) nmm.mdx.On("Start", mock.Anything).Return(nil) - // nmm.mti[0].On("StartNamespace", mock.Anything, "default").Return(nil) - // nmm.mti[1].On("StartNamespace", mock.Anything, "default").Return(nil) nmm.mdi.On("GetNamespace", mock.Anything, "default").Return(nil, nil) nmm.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) nmm.mo.On("PreInit", mock.Anything, mock.Anything).Return(nil) @@ -1882,27 +1880,6 @@ func TestStartDataExchangeFail(t *testing.T) { } -// func TestStartTokensFail(t *testing.T) { -// nm, nmm, cleanup := newTestNamespaceManager(t, true) -// defer cleanup() - -// // nm.namespaces = nil -// nmm.mdi.On("GetNamespace", mock.Anything, "default").Return(nil, nil) -// nmm.mdi.On("UpsertNamespace", mock.Anything, mock.AnythingOfType("*core.Namespace"), true).Return(nil) -// nmm.mo.On("PreInit", mock.Anything, mock.Anything).Return() -// nmm.mo.On("Init").Return(nil) -// nmm.mo.On("Start").Return(nil) -// nmm.mti[0].On("StartNamespace", mock.Anything, "default").Return(fmt.Errorf("pop")) - -// // nmm.mti[1].On("StartNamespace", mock.Anything, "default").Return(fmt.Errorf("pop")) - -// err := nm.startNamespacesAndPlugins(nm.namespaces, map[string]*plugin{ -// "erc721": nm.plugins["erc721"], -// }) -// assert.EqualError(t, err, "pop") - -// } - func TestStartOrchestratorFail(t *testing.T) { nm, nmm, cleanup := newTestNamespaceManager(t, true) defer cleanup() diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 1a5ac6591..25209859e 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -249,16 +249,6 @@ func (or *orchestrator) Init() (err error) { func Purge(ctx context.Context, ns *core.Namespace, plugins *Plugins, dxNodeName string) { // Clear all handlers on all plugins, as this namespace is never coming back setHandlers(ctx, plugins, ns, dxNodeName, nil, nil) - err := plugins.Blockchain.Plugin.StopNamespace(ctx, ns.Name) - if err != nil { - log.L(ctx).Errorf("Error purging namespace '%s' from blockchain plugin '%s': %s", ns.Name, plugins.Blockchain.Name, err.Error()) - } - for _, t := range plugins.Tokens { - err := t.Plugin.StopNamespace(ctx, ns.Name) - if err != nil { - log.L(ctx).Errorf("Error purging namespace '%s' from tokens plugin '%s': %s", ns.Name, t.Name, err.Error()) - } - } } func (or *orchestrator) database() database.Plugin { @@ -314,6 +304,16 @@ func (or *orchestrator) WaitStop() { if !or.started { return } + err := or.plugins.Blockchain.Plugin.StopNamespace(or.ctx, or.namespace.Name) + if err != nil { + log.L(or.ctx).Errorf("Error purging namespace '%s' from blockchain plugin '%s': %s", or.namespace.Name, or.plugins.Blockchain.Name, err.Error()) + } + for _, t := range or.plugins.Tokens { + err := t.Plugin.StopNamespace(or.ctx, or.namespace.Name) + if err != nil { + log.L(or.ctx).Errorf("Error purging namespace '%s' from tokens plugin '%s': %s", or.namespace.Name, t.Name, err.Error()) + } + } if or.batch != nil { or.batch.WaitStop() or.batch = nil diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 50127524f..511afbed5 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -231,7 +231,6 @@ func TestInitOK(t *testing.T) { or.mbi.On("SetHandler", "ns", mock.Anything).Return() or.mbi.On("SetOperationHandler", "ns", mock.Anything).Return() or.mbi.On("StartNamespace", mock.Anything, "ns").Return(nil) - // or.mti.On("StartNamespace", mock.Anything, "ns", mock.Anything).Return(nil) or.mdi.On("GetIdentities", mock.Anything, "ns", mock.Anything).Return([]*core.Identity{node}, nil, nil) or.mdx.On("SetHandler", "ns2", "node1", mock.Anything).Return() or.mdx.On("SetOperationHandler", "ns", mock.Anything).Return() @@ -503,10 +502,34 @@ func TestStartStopOk(t *testing.T) { or.mom.On("WaitStop").Return(nil) or.mem.On("WaitStop").Return(nil) or.mtw.On("Close").Return(nil) + or.mbi.On("StopNamespace", mock.Anything, "ns").Return(nil) + or.mti.On("StopNamespace", mock.Anything, "ns").Return(nil) err := or.Start() assert.NoError(t, err) or.WaitStop() or.WaitStop() // swallows dups + + or = newTestOrchestrator() + or.mdm.On("Start").Return(nil) + or.mba.On("Start").Return(nil) + or.mem.On("Start").Return(nil) + or.mbm.On("Start").Return(nil) + or.msd.On("Start").Return(nil) + or.mom.On("Start").Return(nil) + or.mtw.On("Start").Return() + or.mba.On("WaitStop").Return(nil) + or.mbm.On("WaitStop").Return(nil) + or.mdm.On("WaitStop").Return(nil) + or.msd.On("WaitStop").Return(nil) + or.mom.On("WaitStop").Return(nil) + or.mem.On("WaitStop").Return(nil) + or.mtw.On("Close").Return(nil) + or.mbi.On("StopNamespace", mock.Anything, "ns").Return(fmt.Errorf("pop")) + or.mti.On("StopNamespace", mock.Anything, "ns").Return(fmt.Errorf("pop")) + err = or.Start() + assert.NoError(t, err) + or.WaitStop() + or.WaitStop() // swallows dups } func TestPurge(t *testing.T) { @@ -517,13 +540,13 @@ func TestPurge(t *testing.T) { or.mdi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(nil) + // or.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(nil) or.mps.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mdx.On("SetHandler", mock.Anything, "Test1", mock.Anything).Return(nil) or.mdx.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mti.On("StopNamespace", mock.Anything, "ns").Return(nil) + // or.mti.On("StopNamespace", mock.Anything, "ns").Return(nil) Purge(context.Background(), or.namespace, or.plugins, "Test1") } @@ -535,13 +558,11 @@ func TestPurgeBlockchainError(t *testing.T) { or.mdi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(fmt.Errorf("pop")) or.mps.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mdx.On("SetHandler", mock.Anything, "Test1", mock.Anything).Return(nil) or.mdx.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mti.On("StopNamespace", mock.Anything, "ns").Return(nil) Purge(context.Background(), or.namespace, or.plugins, "Test1") } @@ -553,13 +574,11 @@ func TestPurgeTokenError(t *testing.T) { or.mdi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mbi.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mbi.On("StopNamespace", mock.Anything, mock.Anything).Return(nil) or.mps.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mdx.On("SetHandler", mock.Anything, "Test1", mock.Anything).Return(nil) or.mdx.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetHandler", mock.Anything, mock.Anything).Return(nil) or.mti.On("SetOperationHandler", mock.Anything, mock.Anything).Return(nil) - or.mti.On("StopNamespace", mock.Anything, "ns").Return(fmt.Errorf("pop")) Purge(context.Background(), or.namespace, or.plugins, "Test1") }