Skip to content

Commit

Permalink
Merge pull request #1546 from hyperledger/fix_fabric_listeners
Browse files Browse the repository at this point in the history
fix: fabric check existence of listener
  • Loading branch information
peterbroadhurst authored Jul 25, 2024
2 parents ebf2349 + e479668 commit 003ef7a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 6 deletions.
9 changes: 6 additions & 3 deletions internal/blockchain/fabric/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,25 @@ func (s *streamManager) getSubscriptions(ctx context.Context) (subs []*subscript
return subs, nil
}

func (s *streamManager) getSubscription(ctx context.Context, subID string) (sub *subscription, err error) {
func (s *streamManager) getSubscription(ctx context.Context, subID string, okNotFound bool) (sub *subscription, err error) {
res, err := s.client.R().
SetContext(ctx).
SetResult(&sub).
Get(fmt.Sprintf("/subscriptions/%s", subID))
if err != nil || !res.IsSuccess() {
if okNotFound && res.StatusCode() == 404 {
return nil, nil
}
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgFabconnectRESTErr)
}
return sub, nil
}

func (s *streamManager) getSubscriptionName(ctx context.Context, subID string) (string, error) {
func (s *streamManager) getSubscriptionName(ctx context.Context, subID string, okNotFound bool) (string, error) {
if cachedValue := s.cache.GetString("sub:" + subID); cachedValue != "" {
return cachedValue, nil
}
sub, err := s.getSubscription(ctx, subID)
sub, err := s.getSubscription(ctx, subID, okNotFound)
if err != nil {
return "", err
}
Expand Down
10 changes: 8 additions & 2 deletions internal/blockchain/fabric/fabric.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (f *Fabric) buildEventLocationString(chaincode string) string {

func (f *Fabric) processContractEvent(ctx context.Context, events common.EventsToDispatch, msgJSON fftypes.JSONObject) (err error) {
subID := msgJSON.GetString("subId")
subName, err := f.streams.getSubscriptionName(ctx, subID)
subName, err := f.streams.getSubscriptionName(ctx, subID, false)
if err != nil {
return err // this is a problem - we should be able to find the listener that dispatched this to us
}
Expand Down Expand Up @@ -998,7 +998,13 @@ func (f *Fabric) DeleteContractListener(ctx context.Context, subscription *core.

func (f *Fabric) GetContractListenerStatus(ctx context.Context, namespace, subID string, okNotFound bool) (bool, interface{}, core.ContractListenerStatus, error) {
// Fabconnect does not currently provide any additional status info for listener subscriptions.
return true, nil, core.ContractListenerStatusUnknown, nil
// But we check for existence of the subscription
sub, err := f.streams.getSubscription(ctx, subID, okNotFound)
if err != nil || sub == nil {
return false, nil, core.ContractListenerStatusUnknown, err
}

return true, nil, core.ContractListenerStatusUnknown, err
}

func (f *Fabric) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamValidator, error) {
Expand Down
52 changes: 51 additions & 1 deletion internal/blockchain/fabric/fabric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3360,12 +3360,62 @@ func TestGetContractListenerStatus(t *testing.T) {
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

_, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
e.streams = &streamManager{
client: e.client,
}

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
httpmock.NewJsonResponderOrPanic(200, subscription{
ID: "sb-cb37cc07-e873-4f58-44ab-55add6bba320", Stream: "es12345", Name: "ff-sub-ns1-11232312312",
}))

found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
assert.True(t, found)
assert.Nil(t, detail)
assert.Equal(t, core.ContractListenerStatusUnknown, status)
assert.NoError(t, err)
}

func TestGetContractListenerStatusNotFound(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

e.streams = &streamManager{
client: e.client,
}

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
httpmock.NewJsonResponderOrPanic(404, nil))

found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
assert.False(t, found)
assert.Nil(t, detail)
assert.Equal(t, core.ContractListenerStatusUnknown, status)
assert.NoError(t, err)
}

func TestGetContractListenerStatusError(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
httpmock.ActivateNonDefault(e.client.GetClient())
defer httpmock.DeactivateAndReset()

e.streams = &streamManager{
client: e.client,
}

httpmock.RegisterResponder("GET", "http://localhost:12345/subscriptions/id",
httpmock.NewJsonResponderOrPanic(500, nil))

found, detail, status, err := e.GetContractListenerStatus(context.Background(), "ns1", "id", true)
assert.False(t, found)
assert.Nil(t, detail)
assert.Equal(t, core.ContractListenerStatusUnknown, status)
assert.Error(t, err)
}

func TestGetTransactionStatus(t *testing.T) {
e, cancel := newTestFabric()
defer cancel()
Expand Down

0 comments on commit 003ef7a

Please sign in to comment.