diff --git a/internal/blockchain/fabric/eventstream.go b/internal/blockchain/fabric/eventstream.go index 3b38142b4..48d1b7271 100644 --- a/internal/blockchain/fabric/eventstream.go +++ b/internal/blockchain/fabric/eventstream.go @@ -156,22 +156,25 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript return subs, nil } -func (s *streamManager) getSubscription(ctx context.Context, subID string) (sub *subscription, err error) { +func (s *streamManager) getSubscription(ctx context.Context, subID string, okNotFound bool) (sub *subscription, err error) { res, err := s.client.R(). SetContext(ctx). SetResult(&sub). Get(fmt.Sprintf("/subscriptions/%s", subID)) if err != nil || !res.IsSuccess() { + if okNotFound && res.StatusCode() == 404 { + return nil, nil + } return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr) } return sub, nil } -func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) { +func (s *streamManager) getSubscriptionName(ctx context.Context, subID string, okNotFound bool) (string, error) { if cachedValue := s.cache.GetString("sub:" + subID); cachedValue != "" { return cachedValue, nil } - sub, err := s.getSubscription(ctx, subID) + sub, err := s.getSubscription(ctx, subID, okNotFound) if err != nil { return "", err } diff --git a/internal/blockchain/fabric/fabric.go b/internal/blockchain/fabric/fabric.go index 7729aaac1..574e5d8e9 100644 --- a/internal/blockchain/fabric/fabric.go +++ b/internal/blockchain/fabric/fabric.go @@ -396,7 +396,7 @@ func (f *Fabric) buildEventLocationString(chaincode string) string { func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsToDispatch, msgJSON fftypes.JSONObject) (err error) { subID := msgJSON.GetString("subId") - subName, err := f.streams.getSubscriptionName(ctx, subID) + subName, err := f.streams.getSubscriptionName(ctx, subID, false) if err != nil { return err // this is a problem - we should be able to find the listener that dispatched this to us } @@ -998,7 +998,13 @@ func (f *Fabric) DeleteContractListener(ctx context.Context, subscription *core. func (f *Fabric) GetContractListenerStatus(ctx context.Context, namespace, subID string, okNotFound bool) (bool, interface{}, core.ContractListenerStatus, error) { // Fabconnect does not currently provide any additional status info for listener subscriptions. - return true, nil, core.ContractListenerStatusUnknown, nil + // But we check for existence of the subscription + sub, err := f.streams.getSubscription(ctx, subID, okNotFound) + if err != nil || sub == nil { + return false, nil, core.ContractListenerStatusUnknown, err + } + + return true, nil, core.ContractListenerStatusUnknown, err } func (f *Fabric) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamValidator, error) { diff --git a/internal/blockchain/fabric/fabric_test.go b/internal/blockchain/fabric/fabric_test.go index 11799546c..a839fa9ee 100644 --- a/internal/blockchain/fabric/fabric_test.go +++ b/internal/blockchain/fabric/fabric_test.go @@ -3360,12 +3360,62 @@ func TestGetContractListenerStatus(t *testing.T) { httpmock.ActivateNonDefault(e.client.GetClient()) defer httpmock.DeactivateAndReset() - _, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true) + e.streams = &streamManager{ + client: e.client, + } + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id", + httpmock.NewJsonResponderOrPanic(200, subscription{ + ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312", + })) + + found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true) + assert.True(t, found) + assert.Nil(t, detail) + assert.Equal(t, core.ContractListenerStatusUnknown, status) + assert.NoError(t, err) +} + +func TestGetContractListenerStatusNotFound(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + e.streams = &streamManager{ + client: e.client, + } + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id", + httpmock.NewJsonResponderOrPanic(404, nil)) + + found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true) + assert.False(t, found) assert.Nil(t, detail) assert.Equal(t, core.ContractListenerStatusUnknown, status) assert.NoError(t, err) } +func TestGetContractListenerStatusError(t *testing.T) { + e, cancel := newTestFabric() + defer cancel() + httpmock.ActivateNonDefault(e.client.GetClient()) + defer httpmock.DeactivateAndReset() + + e.streams = &streamManager{ + client: e.client, + } + + httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id", + httpmock.NewJsonResponderOrPanic(500, nil)) + + found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true) + assert.False(t, found) + assert.Nil(t, detail) + assert.Equal(t, core.ContractListenerStatusUnknown, status) + assert.Error(t, err) +} + func TestGetTransactionStatus(t *testing.T) { e, cancel := newTestFabric() defer cancel()