From cb2fa7dc7a84da84568fa7865e1e388c0f46f2cb Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv <u.andrukhiv@gmail.com> Date: Mon, 13 Jan 2025 12:14:58 +0200 Subject: [PATCH 1/9] Added arguments getter for data providers, filled ListSubscriptionsMessageResponse with Arguments, updated unit tests --- engine/access/rest/websockets/controller.go | 1 + .../account_statuses_provider.go | 1 + .../data_providers/base_provider.go | 9 ++++++++ .../data_providers/block_digests_provider.go | 1 + .../data_providers/block_headers_provider.go | 1 + .../data_providers/blocks_provider.go | 1 + .../data_providers/data_provider.go | 4 ++++ .../data_providers/events_provider.go | 1 + .../websockets/data_providers/factory_test.go | 1 + .../data_providers/mock/data_provider.go | 21 +++++++++++++++++++ ...d_and_get_transaction_statuses_provider.go | 1 + .../transaction_statuses_provider.go | 1 + 12 files changed, 43 insertions(+) diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index 8fe04dd8e87..a58c1f08c4f 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -449,6 +449,7 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis subs = append(subs, &models.SubscriptionEntry{ SubscriptionID: id.String(), Topic: provider.Topic(), + Arguments: provider.Arguments(), }) return nil }) diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 5b3c112fd09..5600bffea5c 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -68,6 +68,7 @@ func NewAccountStatusesDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments. diff --git a/engine/access/rest/websockets/data_providers/base_provider.go b/engine/access/rest/websockets/data_providers/base_provider.go index 8936a883161..71a73e49fc9 100644 --- a/engine/access/rest/websockets/data_providers/base_provider.go +++ b/engine/access/rest/websockets/data_providers/base_provider.go @@ -5,6 +5,7 @@ import ( "github.com/google/uuid" + "github.com/onflow/flow-go/engine/access/rest/websockets/models" "github.com/onflow/flow-go/engine/access/subscription" ) @@ -12,6 +13,7 @@ import ( type baseDataProvider struct { subscriptionID uuid.UUID topic string + arguments models.Arguments cancel context.CancelFunc send chan<- interface{} subscription subscription.Subscription @@ -21,6 +23,7 @@ type baseDataProvider struct { func newBaseDataProvider( subscriptionID uuid.UUID, topic string, + arguments models.Arguments, cancel context.CancelFunc, send chan<- interface{}, subscription subscription.Subscription, @@ -28,6 +31,7 @@ func newBaseDataProvider( return &baseDataProvider{ subscriptionID: subscriptionID, topic: topic, + arguments: arguments, cancel: cancel, send: send, subscription: subscription, @@ -44,6 +48,11 @@ func (b *baseDataProvider) Topic() string { return b.topic } +// Arguments returns the arguments associated with the data provider. +func (b *baseDataProvider) Arguments() models.Arguments { + return b.arguments +} + // Close terminates the data provider. // // No errors are expected during normal operations. diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider.go b/engine/access/rest/websockets/data_providers/block_digests_provider.go index e00f164972e..6bd68bcabd1 100644 --- a/engine/access/rest/websockets/data_providers/block_digests_provider.go +++ b/engine/access/rest/websockets/data_providers/block_digests_provider.go @@ -49,6 +49,7 @@ func NewBlockDigestsDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, blockArgs), // Set up a subscription to block digests based on arguments. diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider.go b/engine/access/rest/websockets/data_providers/block_headers_provider.go index 582b9cb0924..5a971d801f9 100644 --- a/engine/access/rest/websockets/data_providers/block_headers_provider.go +++ b/engine/access/rest/websockets/data_providers/block_headers_provider.go @@ -49,6 +49,7 @@ func NewBlockHeadersDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, blockArgs), // Set up a subscription to block headers based on arguments. diff --git a/engine/access/rest/websockets/data_providers/blocks_provider.go b/engine/access/rest/websockets/data_providers/blocks_provider.go index 49c5ee43cd2..15f18b5bd24 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider.go @@ -58,6 +58,7 @@ func NewBlocksDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, blockArgs), // Set up a subscription to blocks based on arguments. diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go index ed6c11b0f0d..5ebe0c4f20b 100644 --- a/engine/access/rest/websockets/data_providers/data_provider.go +++ b/engine/access/rest/websockets/data_providers/data_provider.go @@ -2,6 +2,8 @@ package data_providers import ( "github.com/google/uuid" + + "github.com/onflow/flow-go/engine/access/rest/websockets/models" ) // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector. @@ -11,6 +13,8 @@ type DataProvider interface { ID() uuid.UUID // Topic returns the topic associated with the data provider. Topic() string + // Arguments returns the arguments associated with the data provider. + Arguments() models.Arguments // Close terminates the data provider. // // No errors are expected during normal operations. diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 2f22ff6cbaf..2471079716a 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -67,6 +67,7 @@ func NewEventsDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, eventArgs), // Set up a subscription to events based on arguments. diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index c6dd8fb930f..bb2d4a7a59a 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -166,6 +166,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.Require().NotNil(provider, "Expected provider for topic %s", test.topic) s.Require().NoError(err, "Expected no error for topic %s", test.topic) s.Require().Equal(test.topic, provider.Topic()) + s.Require().Equal(test.arguments, provider.Arguments()) test.assertExpectations() }) diff --git a/engine/access/rest/websockets/data_providers/mock/data_provider.go b/engine/access/rest/websockets/data_providers/mock/data_provider.go index 48debb23ae3..1e954fa89f8 100644 --- a/engine/access/rest/websockets/data_providers/mock/data_provider.go +++ b/engine/access/rest/websockets/data_providers/mock/data_provider.go @@ -4,6 +4,7 @@ package mock import ( uuid "github.com/google/uuid" + models "github.com/onflow/flow-go/engine/access/rest/websockets/models" mock "github.com/stretchr/testify/mock" ) @@ -12,6 +13,26 @@ type DataProvider struct { mock.Mock } +// Arguments provides a mock function with given fields: +func (_m *DataProvider) Arguments() models.Arguments { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Arguments") + } + + var r0 models.Arguments + if rf, ok := ret.Get(0).(func() models.Arguments); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(models.Arguments) + } + } + + return r0 +} + // Close provides a mock function with given fields: func (_m *DataProvider) Close() { _m.Called() diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go index dceaaf5899e..71c15e85990 100644 --- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go @@ -59,6 +59,7 @@ func NewSendAndGetTransactionStatusesDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, sendTxStatusesArgs), // Set up a subscription to tx statuses based on arguments. diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index e18a9d3a1f0..77010e51c09 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -62,6 +62,7 @@ func NewTransactionStatusesDataProvider( p.baseDataProvider = newBaseDataProvider( subscriptionID, topic, + arguments, cancel, send, p.createSubscription(subCtx, txStatusesArgs), // Set up a subscription to tx statuses based on arguments. From acb6b02204541a8f67fed9131f827a42511f5460 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv <u.andrukhiv@gmail.com> Date: Mon, 13 Jan 2025 13:23:23 +0200 Subject: [PATCH 2/9] Updatated TestListSubscriptions unit test --- engine/access/rest/websockets/controller_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index ace74b8c8e1..580d64ea8a6 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -463,7 +463,9 @@ func (s *WsControllerSuite) TestListSubscriptions() { done := make(chan struct{}) topic := dp.BlocksTopic + arguments := models.Arguments{} dataProvider.On("Topic").Return(topic) + dataProvider.On("Arguments").Return(arguments) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -509,6 +511,7 @@ func (s *WsControllerSuite) TestListSubscriptions() { require.Equal(t, 1, len(response.Subscriptions)) require.Equal(t, subscriptionID, response.Subscriptions[0].SubscriptionID) require.Equal(t, topic, response.Subscriptions[0].Topic) + require.Equal(t, arguments, response.Subscriptions[0].Arguments) return websocket.ErrCloseSent }). From 0e3d1cfebfac08fb2ddfe6f3f8cff4a15c0edd47 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv <u.andrukhiv@gmail.com> Date: Wed, 15 Jan 2025 14:17:24 +0200 Subject: [PATCH 3/9] Reverted changes from illia-malachyn/6845-unify-subscription-and-message-id to avoid pendency on it --- engine/access/rest/websockets/controller.go | 98 +++++------ .../access/rest/websockets/controller_test.go | 161 +++++++++++------- .../account_statuses_provider.go | 3 - .../account_statuses_provider_test.go | 11 +- .../data_providers/base_provider.go | 29 ++-- .../data_providers/block_digests_provider.go | 3 - .../block_digests_provider_test.go | 3 +- .../data_providers/block_headers_provider.go | 3 - .../block_headers_provider_test.go | 3 +- .../data_providers/blocks_provider.go | 3 - .../data_providers/blocks_provider_test.go | 3 +- .../data_providers/data_provider.go | 2 +- .../data_providers/events_provider.go | 3 - .../data_providers/events_provider_test.go | 11 +- .../rest/websockets/data_providers/factory.go | 18 +- .../websockets/data_providers/factory_test.go | 5 +- .../mock/data_provider_factory.go | 20 +-- ...d_and_get_transaction_statuses_provider.go | 3 - ..._get_transaction_statuses_provider_test.go | 2 - .../transaction_statuses_provider.go | 3 - .../transaction_statuses_provider_test.go | 3 - .../websockets/data_providers/unit_test.go | 8 +- engine/access/rest/websockets/error_codes.go | 10 ++ .../rest/websockets/models/account_models.go | 2 +- .../rest/websockets/models/base_message.go | 17 +- .../rest/websockets/models/error_message.go | 7 + .../websockets/models/list_subscriptions.go | 6 +- .../websockets/models/subscribe_message.go | 2 +- .../websockets/models/subscription_entry.go | 6 +- .../websockets/models/unsubscribe_message.go | 2 +- 30 files changed, 206 insertions(+), 244 deletions(-) create mode 100644 engine/access/rest/websockets/error_codes.go create mode 100644 engine/access/rest/websockets/models/error_message.go diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go index 75f8b764f8e..26c1c977cbf 100644 --- a/engine/access/rest/websockets/controller.go +++ b/engine/access/rest/websockets/controller.go @@ -77,7 +77,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "sync" "time" @@ -247,7 +246,7 @@ func (c *Controller) keepalive(ctx context.Context) error { // If no messages are sent within InactivityTimeout and no active data providers exist, // the connection will be closed. func (c *Controller) writeMessages(ctx context.Context) error { - inactivityTicker := time.NewTicker(c.inactivityTickerPeriod()) + inactivityTicker := time.NewTicker(c.config.InactivityTimeout / 10) defer inactivityTicker.Stop() lastMessageSentAt := time.Now() @@ -302,10 +301,6 @@ func (c *Controller) writeMessages(ctx context.Context) error { } } -func (c *Controller) inactivityTickerPeriod() time.Duration { - return c.config.InactivityTimeout / 10 -} - // readMessages continuously reads messages from a client WebSocket connection, // validates each message, and processes it based on the message type. func (c *Controller) readMessages(ctx context.Context) error { @@ -319,8 +314,7 @@ func (c *Controller) readMessages(ctx context.Context) error { c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusBadRequest, "error reading message", ""), - ) + wrapErrorMessage(InvalidMessage, "error reading message", "", "", "")) continue } @@ -329,8 +323,7 @@ func (c *Controller) readMessages(ctx context.Context) error { c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusBadRequest, "error parsing message", ""), - ) + wrapErrorMessage(InvalidMessage, "error parsing message", "", "", "")) continue } } @@ -373,32 +366,24 @@ func (c *Controller) handleMessage(ctx context.Context, message json.RawMessage) } func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) { - subscriptionID, err := c.parseOrCreateSubscriptionID(msg.SubscriptionID) - if err != nil { - c.writeErrorResponse( - ctx, - err, - wrapErrorMessage(http.StatusBadRequest, "error parsing subscription id", msg.SubscriptionID), - ) - return - } - // register new provider - provider, err := c.dataProviderFactory.NewDataProvider(ctx, subscriptionID, msg.Topic, msg.Arguments, c.multiplexedStream) + provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream) if err != nil { c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusBadRequest, "error creating data provider", subscriptionID.String()), + wrapErrorMessage(InvalidArgument, "error creating data provider", msg.ClientMessageID, models.SubscribeAction, ""), ) return } - c.dataProviders.Add(subscriptionID, provider) + c.dataProviders.Add(provider.ID(), provider) // write OK response to client responseOk := models.SubscribeMessageResponse{ BaseMessageResponse: models.BaseMessageResponse{ - SubscriptionID: subscriptionID.String(), + ClientMessageID: msg.ClientMessageID, + Success: true, + SubscriptionID: provider.ID().String(), }, } c.writeResponse(ctx, responseOk) @@ -411,65 +396,73 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusInternalServerError, "internal error", subscriptionID.String()), + wrapErrorMessage(SubscriptionError, "subscription finished with error", "", "", ""), ) } c.dataProvidersGroup.Done() - c.dataProviders.Remove(subscriptionID) + c.dataProviders.Remove(provider.ID()) }() } func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) { - subscriptionID, err := uuid.Parse(msg.SubscriptionID) + id, err := uuid.Parse(msg.SubscriptionID) if err != nil { c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusBadRequest, "error parsing subscription id", msg.SubscriptionID), + wrapErrorMessage(InvalidArgument, "error parsing subscription ID", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID), ) return } - provider, ok := c.dataProviders.Get(subscriptionID) + provider, ok := c.dataProviders.Get(id) if !ok { c.writeErrorResponse( ctx, err, - wrapErrorMessage(http.StatusNotFound, "subscription not found", subscriptionID.String()), + wrapErrorMessage(NotFound, "subscription not found", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID), ) return } provider.Close() - c.dataProviders.Remove(subscriptionID) + c.dataProviders.Remove(id) responseOk := models.UnsubscribeMessageResponse{ BaseMessageResponse: models.BaseMessageResponse{ - SubscriptionID: subscriptionID.String(), + ClientMessageID: msg.ClientMessageID, + Success: true, + SubscriptionID: msg.SubscriptionID, }, } c.writeResponse(ctx, responseOk) } -func (c *Controller) handleListSubscriptions(ctx context.Context, _ models.ListSubscriptionsMessageRequest) { +func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.ListSubscriptionsMessageRequest) { var subs []*models.SubscriptionEntry err := c.dataProviders.ForEach(func(id uuid.UUID, provider dp.DataProvider) error { subs = append(subs, &models.SubscriptionEntry{ - SubscriptionID: id.String(), - Topic: provider.Topic(), - Arguments: provider.Arguments(), + ID: id.String(), + Topic: provider.Topic(), + Arguments: provider.Arguments(), }) return nil }) - // intentionally ignored, this never happens if err != nil { - c.logger.Debug().Err(err).Msg("error listing subscriptions") + c.writeErrorResponse( + ctx, + err, + wrapErrorMessage(NotFound, "error listing subscriptions", msg.ClientMessageID, models.ListSubscriptionsAction, ""), + ) + return } responseOk := models.ListSubscriptionsMessageResponse{ - Subscriptions: subs, + Success: true, + ClientMessageID: msg.ClientMessageID, + Subscriptions: subs, } c.writeResponse(ctx, responseOk) } @@ -506,30 +499,15 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) { } } -func wrapErrorMessage(code int, message string, subscriptionID string) models.BaseMessageResponse { +func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse { return models.BaseMessageResponse{ - SubscriptionID: subscriptionID, + ClientMessageID: msgId, + Success: false, + SubscriptionID: subscriptionID, Error: models.ErrorMessage{ - Code: code, + Code: int(code), Message: message, + Action: action, }, } } - -func (c *Controller) parseOrCreateSubscriptionID(id string) (uuid.UUID, error) { - // if client didn't provide subscription id, we create one for them - if id == "" { - return uuid.New(), nil - } - - newID, err := uuid.Parse(id) - if err != nil { - return uuid.Nil, err - } - - if c.dataProviders.Has(newID) { - return uuid.Nil, fmt.Errorf("subscription id is already in use") - } - - return newID, nil -} diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index c6a9650cc90..6e0c618879f 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "net/http" "testing" "time" @@ -52,11 +51,14 @@ func (s *WsControllerSuite) TestSubscribeRequest() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() done := make(chan struct{}) + + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -69,8 +71,8 @@ func (s *WsControllerSuite) TestSubscribeRequest() { request := models.SubscribeMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: uuid.New().String(), - Action: models.SubscribeAction, + ClientMessageID: uuid.New().String(), + Action: models.SubscribeAction, }, Topic: dp.BlocksTopic, Arguments: nil, @@ -96,7 +98,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() { response, ok := msg.(models.SubscribeMessageResponse) require.True(t, ok) - require.Equal(t, request.SubscriptionID, response.SubscriptionID) + require.True(t, response.Success) + require.Equal(t, request.ClientMessageID, response.ClientMessageID) + require.Equal(t, id.String(), response.SubscriptionID) return websocket.ErrCloseSent }) @@ -144,8 +148,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() { response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) + require.False(t, response.Success) require.NotEmpty(t, response.Error) - require.Equal(t, http.StatusBadRequest, response.Error.Code) + require.Equal(t, int(InvalidMessage), response.Error.Code) return websocket.ErrCloseSent }) @@ -164,13 +169,12 @@ func (s *WsControllerSuite) TestSubscribeRequest() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("error creating data provider")). Once() done := make(chan struct{}) - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) + s.expectSubscribeRequest(t, conn) conn. On("WriteJSON", mock.Anything). @@ -179,8 +183,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() { response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) + require.False(t, response.Success) require.NotEmpty(t, response.Error) - require.Equal(t, http.StatusBadRequest, response.Error.Code) + require.Equal(t, int(InvalidArgument), response.Error.Code) return websocket.ErrCloseSent }) @@ -199,6 +204,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() { conn, dataProviderFactory, dataProvider := newControllerMocks(t) controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) + dataProvider.On("ID").Return(uuid.New()) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -208,14 +214,13 @@ func (s *WsControllerSuite) TestSubscribeRequest() { Once() dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() done := make(chan struct{}) - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) conn. On("WriteJSON", mock.Anything). @@ -224,8 +229,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() { response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) + require.False(t, response.Success) require.NotEmpty(t, response.Error) - require.Equal(t, http.StatusInternalServerError, response.Error.Code) + require.Equal(t, int(SubscriptionError), response.Error.Code) return websocket.ErrCloseSent }) @@ -248,11 +254,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() done := make(chan struct{}) + + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -263,15 +272,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) request := models.UnsubscribeMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: subscriptionID, - Action: models.UnsubscribeAction, + ClientMessageID: uuid.New().String(), + Action: models.UnsubscribeAction, }, + SubscriptionID: id.String(), } requestJson, err := json.Marshal(request) require.NoError(t, err) @@ -293,8 +302,9 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { response, ok := msg.(models.UnsubscribeMessageResponse) require.True(t, ok) + require.True(t, response.Success) require.Empty(t, response.Error) - require.Equal(t, request.SubscriptionID, response.SubscriptionID) + require.Equal(t, request.ClientMessageID, response.ClientMessageID) require.Equal(t, request.SubscriptionID, response.SubscriptionID) return websocket.ErrCloseSent @@ -317,11 +327,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() done := make(chan struct{}) + + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -332,15 +345,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) request := models.UnsubscribeMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: "invalid-uuid", - Action: models.UnsubscribeAction, + ClientMessageID: uuid.New().String(), + Action: models.UnsubscribeAction, }, + SubscriptionID: "invalid-uuid", } requestJson, err := json.Marshal(request) require.NoError(t, err) @@ -362,9 +375,10 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) + require.False(t, response.Success) require.NotEmpty(t, response.Error) - require.Equal(t, request.SubscriptionID, response.SubscriptionID) - require.Equal(t, http.StatusBadRequest, response.Error.Code) + require.Equal(t, request.ClientMessageID, response.ClientMessageID) + require.Equal(t, int(InvalidArgument), response.Error.Code) return websocket.ErrCloseSent }). @@ -386,11 +400,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() done := make(chan struct{}) + + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() dataProvider. @@ -401,15 +418,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { Return(nil). Once() - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) request := models.UnsubscribeMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: uuid.New().String(), // unknown subscription id - Action: models.UnsubscribeAction, + ClientMessageID: uuid.New().String(), + Action: models.UnsubscribeAction, }, + SubscriptionID: uuid.New().String(), } requestJson, err := json.Marshal(request) require.NoError(t, err) @@ -431,10 +448,11 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { response, ok := msg.(models.BaseMessageResponse) require.True(t, ok) - require.Equal(t, request.SubscriptionID, response.SubscriptionID) - + require.False(t, response.Success) require.NotEmpty(t, response.Error) - require.Equal(t, http.StatusNotFound, response.Error.Code) + + require.Equal(t, request.ClientMessageID, response.ClientMessageID) + require.Equal(t, int(NotFound), response.Error.Code) return websocket.ErrCloseSent }). @@ -457,14 +475,16 @@ func (s *WsControllerSuite) TestListSubscriptions() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() done := make(chan struct{}) + id := uuid.New() topic := dp.BlocksTopic arguments := models.Arguments{} + dataProvider.On("ID").Return(id) dataProvider.On("Topic").Return(topic) dataProvider.On("Arguments").Return(arguments) // data provider might finish on its own or controller will close it via Close() @@ -477,14 +497,13 @@ func (s *WsControllerSuite) TestListSubscriptions() { Return(nil). Once() - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) request := models.ListSubscriptionsMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: "", - Action: models.ListSubscriptionsAction, + ClientMessageID: uuid.New().String(), + Action: models.ListSubscriptionsAction, }, } requestJson, err := json.Marshal(request) @@ -507,8 +526,11 @@ func (s *WsControllerSuite) TestListSubscriptions() { response, ok := msg.(models.ListSubscriptionsMessageResponse) require.True(t, ok) + require.True(t, response.Success) + require.Empty(t, response.Error) + require.Equal(t, request.ClientMessageID, response.ClientMessageID) require.Equal(t, 1, len(response.Subscriptions)) - require.Equal(t, subscriptionID, response.Subscriptions[0].SubscriptionID) + require.Equal(t, id.String(), response.Subscriptions[0].ID) require.Equal(t, topic, response.Subscriptions[0].Topic) require.Equal(t, arguments, response.Subscriptions[0].Arguments) @@ -535,10 +557,12 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() @@ -553,9 +577,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { Once() done := make(chan struct{}) - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) // Expect a valid block to be passed to WriteJSON. // If we got to this point, the controller executed all its logic properly @@ -589,10 +612,12 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() @@ -609,9 +634,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() { Once() done := make(chan struct{}) - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) i := 0 actualBlocks := make([]*flow.Block, len(expectedBlocks)) @@ -793,10 +817,12 @@ func (s *WsControllerSuite) TestControllerShutdown() { controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory) dataProviderFactory. - On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(dataProvider, nil). Once() + id := uuid.New() + dataProvider.On("ID").Return(id) // data provider might finish on its own or controller will close it via Close() dataProvider.On("Close").Return(nil).Maybe() @@ -809,9 +835,8 @@ func (s *WsControllerSuite) TestControllerShutdown() { Once() done := make(chan struct{}) - subscriptionID := uuid.New().String() - s.expectSubscribeRequest(t, conn, subscriptionID) - s.expectSubscribeResponse(t, conn, subscriptionID) + msgID := s.expectSubscribeRequest(t, conn) + s.expectSubscribeResponse(t, conn, msgID) conn. On("WriteJSON", mock.Anything). @@ -871,14 +896,15 @@ func (s *WsControllerSuite) TestControllerShutdown() { conn. On("ReadJSON", mock.Anything). Return(func(interface{}) error { - // make sure the reader routine sleeps for more time than InactivityTimeout + inactivity ticker period. - // meanwhile, the writer routine must shut down the controller. - <-time.After(wsConfig.InactivityTimeout + controller.inactivityTickerPeriod()*2) + // waiting more than InactivityTimeout to make sure that read message routine busy and do not return + // an error before than inactivity tracker initiate shut down + <-time.After(wsConfig.InactivityTimeout) return websocket.ErrCloseSent }). Once() controller.HandleConnection(context.Background()) + time.Sleep(wsConfig.InactivityTimeout) conn.AssertExpectations(t) }) @@ -970,11 +996,11 @@ func newControllerMocks(t *testing.T) (*connmock.WebsocketConnection, *dpmock.Da } // expectSubscribeRequest mocks the client's subscription request. -func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock.WebsocketConnection, subscriptionID string) { +func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock.WebsocketConnection) string { request := models.SubscribeMessageRequest{ BaseMessageRequest: models.BaseMessageRequest{ - SubscriptionID: subscriptionID, - Action: models.SubscribeAction, + ClientMessageID: uuid.New().String(), + Action: models.SubscribeAction, }, Topic: dp.BlocksTopic, } @@ -991,16 +1017,19 @@ func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock. }). Return(nil). Once() + + return request.ClientMessageID } // expectSubscribeResponse mocks the subscription response sent to the client. -func (s *WsControllerSuite) expectSubscribeResponse(t *testing.T, conn *connmock.WebsocketConnection, subscriptionID string) { +func (s *WsControllerSuite) expectSubscribeResponse(t *testing.T, conn *connmock.WebsocketConnection, msgId string) { conn. On("WriteJSON", mock.Anything). Run(func(args mock.Arguments) { response, ok := args.Get(0).(models.SubscribeMessageResponse) require.True(t, ok) - require.Equal(t, subscriptionID, response.SubscriptionID) + require.Equal(t, msgId, response.ClientMessageID) + require.Equal(t, true, response.Success) }). Return(nil). Once() diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go index 3615fc297fd..d2b368a33b0 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,7 +41,6 @@ func NewAccountStatusesDataProvider( ctx context.Context, logger zerolog.Logger, stateStreamApi state_stream.API, - subscriptionID uuid.UUID, topic string, arguments models.Arguments, send chan<- interface{}, @@ -65,7 +63,6 @@ func NewAccountStatusesDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go index 87b177d33f4..157c50e7deb 100644 --- a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go @@ -4,9 +4,7 @@ import ( "context" "strconv" "testing" - "time" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -178,7 +176,6 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidAr ctx, s.log, s.api, - uuid.New(), topic, test.arguments, send, @@ -206,7 +203,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe // Create a mock subscription and mock the channel sub := ssmock.NewSubscription(s.T()) sub.On("Channel").Return((<-chan interface{})(accountStatusesChan)) - sub.On("Err").Return(nil).Once() + sub.On("Err").Return(nil) s.api.On("SubscribeAccountStatusesFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) @@ -220,7 +217,6 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe ctx, s.log, s.api, - uuid.New(), topic, arguments, send, @@ -235,9 +231,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe defer provider.Close() // Run the provider in a separate goroutine to simulate subscription processing - done := make(chan struct{}) go func() { - defer close(done) err = provider.Run() s.Require().NoError(err) }() @@ -260,9 +254,6 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe responses = append(responses, accountStatusesRes) } - // Wait for the provider goroutine to finish - unittest.RequireCloseBefore(s.T(), done, time.Second, "provider failed to stop") - // Verifying that indices are starting from 0 s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0") diff --git a/engine/access/rest/websockets/data_providers/base_provider.go b/engine/access/rest/websockets/data_providers/base_provider.go index 71a73e49fc9..9387785a2d6 100644 --- a/engine/access/rest/websockets/data_providers/base_provider.go +++ b/engine/access/rest/websockets/data_providers/base_provider.go @@ -11,17 +11,16 @@ import ( // baseDataProvider holds common objects for the provider type baseDataProvider struct { - subscriptionID uuid.UUID - topic string - arguments models.Arguments - cancel context.CancelFunc - send chan<- interface{} - subscription subscription.Subscription + id uuid.UUID + topic string + arguments models.Arguments + cancel context.CancelFunc + send chan<- interface{} + subscription subscription.Subscription } // newBaseDataProvider creates a new instance of baseDataProvider. func newBaseDataProvider( - subscriptionID uuid.UUID, topic string, arguments models.Arguments, cancel context.CancelFunc, @@ -29,18 +28,18 @@ func newBaseDataProvider( subscription subscription.Subscription, ) *baseDataProvider { return &baseDataProvider{ - subscriptionID: subscriptionID, - topic: topic, - arguments: arguments, - cancel: cancel, - send: send, - subscription: subscription, + id: uuid.New(), + topic: topic, + arguments: arguments, + cancel: cancel, + send: send, + subscription: subscription, } } -// ID returns the subscription ID associated with current data provider +// ID returns the unique identifier of the data provider. func (b *baseDataProvider) ID() uuid.UUID { - return b.subscriptionID + return b.id } // Topic returns the topic associated with the data provider. diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider.go b/engine/access/rest/websockets/data_providers/block_digests_provider.go index 144f70751c2..8e08d9b95e1 100644 --- a/engine/access/rest/websockets/data_providers/block_digests_provider.go +++ b/engine/access/rest/websockets/data_providers/block_digests_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" @@ -29,7 +28,6 @@ func NewBlockDigestsDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, - subscriptionID uuid.UUID, topic string, arguments models.Arguments, send chan<- interface{}, @@ -47,7 +45,6 @@ func NewBlockDigestsDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider_test.go b/engine/access/rest/websockets/data_providers/block_digests_provider_test.go index 6bc18ee0650..57576fe60a0 100644 --- a/engine/access/rest/websockets/data_providers/block_digests_provider_test.go +++ b/engine/access/rest/websockets/data_providers/block_digests_provider_test.go @@ -5,7 +5,6 @@ import ( "strconv" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -44,7 +43,7 @@ func (s *BlockDigestsProviderSuite) TestBlockDigestsDataProvider_InvalidArgument for _, test := range s.invalidArgumentsTestCases() { s.Run(test.name, func() { - provider, err := NewBlockDigestsDataProvider(ctx, s.log, s.api, uuid.New(), topic, test.arguments, send) + provider, err := NewBlockDigestsDataProvider(ctx, s.log, s.api, topic, test.arguments, send) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider.go b/engine/access/rest/websockets/data_providers/block_headers_provider.go index fe378c9e2ae..a2e74c1ad2a 100644 --- a/engine/access/rest/websockets/data_providers/block_headers_provider.go +++ b/engine/access/rest/websockets/data_providers/block_headers_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" @@ -30,7 +29,6 @@ func NewBlockHeadersDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, - subscriptionID uuid.UUID, topic string, arguments models.Arguments, send chan<- interface{}, @@ -48,7 +46,6 @@ func NewBlockHeadersDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider_test.go b/engine/access/rest/websockets/data_providers/block_headers_provider_test.go index f2b844117eb..9f71ae124f2 100644 --- a/engine/access/rest/websockets/data_providers/block_headers_provider_test.go +++ b/engine/access/rest/websockets/data_providers/block_headers_provider_test.go @@ -5,7 +5,6 @@ import ( "strconv" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -45,7 +44,7 @@ func (s *BlockHeadersProviderSuite) TestBlockHeadersDataProvider_InvalidArgument for _, test := range s.invalidArgumentsTestCases() { s.Run(test.name, func() { - provider, err := NewBlockHeadersDataProvider(ctx, s.log, s.api, uuid.New(), topic, test.arguments, send) + provider, err := NewBlockHeadersDataProvider(ctx, s.log, s.api, topic, test.arguments, send) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) diff --git a/engine/access/rest/websockets/data_providers/blocks_provider.go b/engine/access/rest/websockets/data_providers/blocks_provider.go index bc98b03a314..b8fc0cfbf99 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" @@ -41,7 +40,6 @@ func NewBlocksDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, - subscriptionID uuid.UUID, linkGenerator commonmodels.LinkGenerator, topic string, arguments models.Arguments, @@ -62,7 +60,6 @@ func NewBlocksDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/blocks_provider_test.go b/engine/access/rest/websockets/data_providers/blocks_provider_test.go index d75e7fb32aa..e73dc99df1d 100644 --- a/engine/access/rest/websockets/data_providers/blocks_provider_test.go +++ b/engine/access/rest/websockets/data_providers/blocks_provider_test.go @@ -6,7 +6,6 @@ import ( "strconv" "testing" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -132,7 +131,7 @@ func (s *BlocksProviderSuite) TestBlocksDataProvider_InvalidArguments() { for _, test := range s.invalidArgumentsTestCases() { s.Run(test.name, func() { - provider, err := NewBlocksDataProvider(ctx, s.log, s.api, uuid.New(), nil, BlocksTopic, test.arguments, send) + provider, err := NewBlocksDataProvider(ctx, s.log, s.api, nil, BlocksTopic, test.arguments, send) s.Require().Nil(provider) s.Require().Error(err) s.Require().Contains(err.Error(), test.expectedErrorMsg) diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go index 5ebe0c4f20b..cfa29853908 100644 --- a/engine/access/rest/websockets/data_providers/data_provider.go +++ b/engine/access/rest/websockets/data_providers/data_provider.go @@ -7,7 +7,7 @@ import ( ) // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector. -// It provides methods for retrieving the provider's unique SubscriptionID, topic, and a methods to close and run the provider. +// It provides methods for retrieving the provider's unique ID, topic, and a methods to close and run the provider. type DataProvider interface { // ID returns the unique identifier of the data provider. ID() uuid.UUID diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go index 9a9ce8c5edf..aa08ad71b87 100644 --- a/engine/access/rest/websockets/data_providers/events_provider.go +++ b/engine/access/rest/websockets/data_providers/events_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine/access/rest/common/parser" @@ -41,7 +40,6 @@ func NewEventsDataProvider( ctx context.Context, logger zerolog.Logger, stateStreamApi state_stream.API, - subscriptionID uuid.UUID, topic string, arguments models.Arguments, send chan<- interface{}, @@ -64,7 +62,6 @@ func NewEventsDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go index 4089f23ea8c..ab9f6110820 100644 --- a/engine/access/rest/websockets/data_providers/events_provider_test.go +++ b/engine/access/rest/websockets/data_providers/events_provider_test.go @@ -5,9 +5,7 @@ import ( "fmt" "strconv" "testing" - "time" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -204,7 +202,6 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() { ctx, s.log, s.api, - uuid.New(), topic, test.arguments, send, @@ -232,7 +229,7 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() // Create a mock subscription and mock the channel sub := ssmock.NewSubscription(s.T()) sub.On("Channel").Return((<-chan interface{})(eventChan)) - sub.On("Err").Return(nil).Once() + sub.On("Err").Return(nil) s.api.On("SubscribeEventsFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub) @@ -246,7 +243,6 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() ctx, s.log, s.api, - uuid.New(), topic, arguments, send, @@ -262,9 +258,7 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() defer provider.Close() // Run the provider in a separate goroutine to simulate subscription processing - done := make(chan struct{}) go func() { - defer close(done) err = provider.Run() s.Require().NoError(err) }() @@ -289,9 +283,6 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath() responses = append(responses, eventRes) } - // Wait for the provider goroutine to finish - unittest.RequireCloseBefore(s.T(), done, time.Second, "provider failed to stop") - // Verifying that indices are starting from 1 s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0") diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go index 38f5fda6da1..cb70b8bcb79 100644 --- a/engine/access/rest/websockets/data_providers/factory.go +++ b/engine/access/rest/websockets/data_providers/factory.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" @@ -34,7 +33,7 @@ type DataProviderFactory interface { // and configuration parameters. // // No errors are expected during normal operations. - NewDataProvider(ctx context.Context, subscriptionID uuid.UUID, topic string, args models.Arguments, ch chan<- interface{}) (DataProvider, error) + NewDataProvider(ctx context.Context, topic string, args models.Arguments, ch chan<- interface{}) (DataProvider, error) } var _ DataProviderFactory = (*DataProviderFactoryImpl)(nil) @@ -94,26 +93,25 @@ func NewDataProviderFactory( // No errors are expected during normal operations. func (s *DataProviderFactoryImpl) NewDataProvider( ctx context.Context, - subscriptionID uuid.UUID, topic string, arguments models.Arguments, ch chan<- interface{}, ) (DataProvider, error) { switch topic { case BlocksTopic: - return NewBlocksDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch) + return NewBlocksDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch) case BlockHeadersTopic: - return NewBlockHeadersDataProvider(ctx, s.logger, s.accessApi, subscriptionID, topic, arguments, ch) + return NewBlockHeadersDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case BlockDigestsTopic: - return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, subscriptionID, topic, arguments, ch) + return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch) case EventsTopic: - return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, subscriptionID, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) + return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case AccountStatusesTopic: - return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, subscriptionID, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) + return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval) case TransactionStatusesTopic: - return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch) + return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch) case SendAndGetTransactionStatusesTopic: - return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch) + return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch) default: return nil, fmt.Errorf("unsupported topic \"%s\"", topic) } diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go index a15f58443e1..d3378b527ea 100644 --- a/engine/access/rest/websockets/data_providers/factory_test.go +++ b/engine/access/rest/websockets/data_providers/factory_test.go @@ -5,7 +5,6 @@ import ( "fmt" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -161,7 +160,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() { s.T().Parallel() test.setupSubscription() - provider, err := s.factory.NewDataProvider(s.ctx, uuid.New(), test.topic, test.arguments, s.ch) + provider, err := s.factory.NewDataProvider(s.ctx, test.topic, test.arguments, s.ch) s.Require().NotNil(provider, "Expected provider for topic %s", test.topic) s.Require().NoError(err, "Expected no error for topic %s", test.topic) s.Require().Equal(test.topic, provider.Topic()) @@ -184,7 +183,7 @@ func (s *DataProviderFactorySuite) TestUnsupportedTopics() { } for _, topic := range unsupportedTopics { - provider, err := s.factory.NewDataProvider(s.ctx, uuid.New(), topic, nil, s.ch) + provider, err := s.factory.NewDataProvider(s.ctx, topic, nil, s.ch) s.Require().Nil(provider, "Expected no provider for unsupported topic %s", topic) s.Require().Error(err, "Expected error for unsupported topic %s", topic) s.Require().EqualError(err, fmt.Sprintf("unsupported topic \"%s\"", topic)) diff --git a/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go b/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go index 7c7d4bc58c0..af49cb4e687 100644 --- a/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go +++ b/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go @@ -9,8 +9,6 @@ import ( mock "github.com/stretchr/testify/mock" models "github.com/onflow/flow-go/engine/access/rest/websockets/models" - - uuid "github.com/google/uuid" ) // DataProviderFactory is an autogenerated mock type for the DataProviderFactory type @@ -18,9 +16,9 @@ type DataProviderFactory struct { mock.Mock } -// NewDataProvider provides a mock function with given fields: ctx, subscriptionID, topic, args, ch -func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, subscriptionID uuid.UUID, topic string, args models.Arguments, ch chan<- interface{}) (data_providers.DataProvider, error) { - ret := _m.Called(ctx, subscriptionID, topic, args, ch) +// NewDataProvider provides a mock function with given fields: ctx, topic, args, ch +func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, topic string, args models.Arguments, ch chan<- interface{}) (data_providers.DataProvider, error) { + ret := _m.Called(ctx, topic, args, ch) if len(ret) == 0 { panic("no return value specified for NewDataProvider") @@ -28,19 +26,19 @@ func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, subscription var r0 data_providers.DataProvider var r1 error - if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) (data_providers.DataProvider, error)); ok { - return rf(ctx, subscriptionID, topic, args, ch) + if rf, ok := ret.Get(0).(func(context.Context, string, models.Arguments, chan<- interface{}) (data_providers.DataProvider, error)); ok { + return rf(ctx, topic, args, ch) } - if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) data_providers.DataProvider); ok { - r0 = rf(ctx, subscriptionID, topic, args, ch) + if rf, ok := ret.Get(0).(func(context.Context, string, models.Arguments, chan<- interface{}) data_providers.DataProvider); ok { + r0 = rf(ctx, topic, args, ch) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(data_providers.DataProvider) } } - if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) error); ok { - r1 = rf(ctx, subscriptionID, topic, args, ch) + if rf, ok := ret.Get(1).(func(context.Context, string, models.Arguments, chan<- interface{}) error); ok { + r1 = rf(ctx, topic, args, ch) } else { r1 = ret.Error(1) } diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go index e496792c9b2..9bfde0aef3a 100644 --- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -40,7 +39,6 @@ func NewSendAndGetTransactionStatusesDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, - subscriptionID uuid.UUID, linkGenerator commonmodels.LinkGenerator, topic string, arguments models.Arguments, @@ -61,7 +59,6 @@ func NewSendAndGetTransactionStatusesDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go index 75932ebde11..7fad80de987 100644 --- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go @@ -4,7 +4,6 @@ import ( "context" "testing" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -136,7 +135,6 @@ func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataPr ctx, s.log, s.api, - uuid.New(), s.linkGenerator, topic, test.arguments, diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go index 87fca705b44..de667664769 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/google/uuid" "github.com/rs/zerolog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -43,7 +42,6 @@ func NewTransactionStatusesDataProvider( ctx context.Context, logger zerolog.Logger, api access.API, - subscriptionID uuid.UUID, linkGenerator commonmodels.LinkGenerator, topic string, arguments models.Arguments, @@ -64,7 +62,6 @@ func NewTransactionStatusesDataProvider( subCtx, cancel := context.WithCancel(ctx) p.baseDataProvider = newBaseDataProvider( - subscriptionID, topic, arguments, cancel, diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go index fa05becd429..72101f769f7 100644 --- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go +++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -188,7 +187,6 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_I ctx, s.log, s.api, - uuid.New(), s.linkGenerator, topic, test.arguments, @@ -283,7 +281,6 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr ctx, s.log, s.api, - uuid.New(), s.linkGenerator, topic, arguments, diff --git a/engine/access/rest/websockets/data_providers/unit_test.go b/engine/access/rest/websockets/data_providers/unit_test.go index b3ad732caa3..94d9534798f 100644 --- a/engine/access/rest/websockets/data_providers/unit_test.go +++ b/engine/access/rest/websockets/data_providers/unit_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/onflow/flow-go/engine/access/rest/websockets/models" @@ -64,7 +63,7 @@ func testHappyPath( test.setupBackend(sub) // Create the data provider instance - provider, err := factory.NewDataProvider(ctx, uuid.New(), topic, test.arguments, send) + provider, err := factory.NewDataProvider(ctx, topic, test.arguments, send) require.NotNil(t, provider) require.NoError(t, err) @@ -73,9 +72,7 @@ func testHappyPath( defer provider.Close() // Run the provider in a separate goroutine - done := make(chan struct{}) go func() { - defer close(done) err = provider.Run() require.NoError(t, err) }() @@ -86,9 +83,6 @@ func testHappyPath( sendData(dataChan) }() - // Wait for the provider goroutine to finish - unittest.RequireCloseBefore(t, done, time.Second, "provider failed to stop") - // Collect responses for i, expected := range test.expectedResponses { unittest.RequireReturnsBefore(t, func() { diff --git a/engine/access/rest/websockets/error_codes.go b/engine/access/rest/websockets/error_codes.go new file mode 100644 index 00000000000..fd206bed0b3 --- /dev/null +++ b/engine/access/rest/websockets/error_codes.go @@ -0,0 +1,10 @@ +package websockets + +type Code int + +const ( + InvalidMessage Code = iota + InvalidArgument + NotFound + SubscriptionError +) diff --git a/engine/access/rest/websockets/models/account_models.go b/engine/access/rest/websockets/models/account_models.go index bad5e155721..7ad7243a96c 100644 --- a/engine/access/rest/websockets/models/account_models.go +++ b/engine/access/rest/websockets/models/account_models.go @@ -2,7 +2,7 @@ package models // AccountStatusesResponse is the response message for 'events' topic. type AccountStatusesResponse struct { - BlockID string `json:"block_id"` + BlockID string `json:"blockID"` Height string `json:"height"` AccountEvents AccountEvents `json:"account_events"` MessageIndex uint64 `json:"message_index"` diff --git a/engine/access/rest/websockets/models/base_message.go b/engine/access/rest/websockets/models/base_message.go index dc26a2914b0..cdcd72eb1ed 100644 --- a/engine/access/rest/websockets/models/base_message.go +++ b/engine/access/rest/websockets/models/base_message.go @@ -8,19 +8,14 @@ const ( // BaseMessageRequest represents a base structure for incoming messages. type BaseMessageRequest struct { - // SubscriptionID is UUID generated by either client or server to uniquely identify subscription. - // It is empty for 'list_subscription' action - SubscriptionID string `json:"subscription_id,omitempty"` - Action string `json:"action"` // Action is an action to perform (e.g. 'subscribe' to some data) + Action string `json:"action"` // subscribe, unsubscribe or list_subscriptions + ClientMessageID string `json:"message_id"` // ClientMessageID is a uuid generated by client to identify request/response uniquely } // BaseMessageResponse represents a base structure for outgoing messages. type BaseMessageResponse struct { - SubscriptionID string `json:"subscription_id"` // SubscriptionID might be empty in case of error response - Error ErrorMessage `json:"error,omitempty"` // Error might be empty in case of OK response -} - -type ErrorMessage struct { - Code int `json:"code"` // Code is an error code that categorizes an error - Message string `json:"message"` + SubscriptionID string `json:"subscription_id"` + ClientMessageID string `json:"message_id,omitempty"` // ClientMessageID may be empty in case we send msg by ourselves (e.g. error occurred) + Success bool `json:"success"` + Error ErrorMessage `json:"error,omitempty"` } diff --git a/engine/access/rest/websockets/models/error_message.go b/engine/access/rest/websockets/models/error_message.go new file mode 100644 index 00000000000..d5c0670926f --- /dev/null +++ b/engine/access/rest/websockets/models/error_message.go @@ -0,0 +1,7 @@ +package models + +type ErrorMessage struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action,omitempty"` +} diff --git a/engine/access/rest/websockets/models/list_subscriptions.go b/engine/access/rest/websockets/models/list_subscriptions.go index 185fc65bbe0..4893a34b09d 100644 --- a/engine/access/rest/websockets/models/list_subscriptions.go +++ b/engine/access/rest/websockets/models/list_subscriptions.go @@ -8,6 +8,8 @@ type ListSubscriptionsMessageRequest struct { // ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests. // It contains a list of active subscriptions for the current WebSocket connection. type ListSubscriptionsMessageResponse struct { - // Subscription list might be empty in case of no active subscriptions - Subscriptions []*SubscriptionEntry `json:"subscriptions"` + ClientMessageID string `json:"message_id"` + Success bool `json:"success"` + Error ErrorMessage `json:"error,omitempty"` + Subscriptions []*SubscriptionEntry `json:"subscriptions,omitempty"` } diff --git a/engine/access/rest/websockets/models/subscribe_message.go b/engine/access/rest/websockets/models/subscribe_message.go index b4bd9e871da..532e4c6a987 100644 --- a/engine/access/rest/websockets/models/subscribe_message.go +++ b/engine/access/rest/websockets/models/subscribe_message.go @@ -6,7 +6,7 @@ type Arguments map[string]interface{} type SubscribeMessageRequest struct { BaseMessageRequest Topic string `json:"topic"` // Topic to subscribe to - Arguments Arguments `json:"arguments"` // Arguments are the arguments for the subscribed topic + Arguments Arguments `json:"arguments"` // Additional arguments for subscription } // SubscribeMessageResponse represents the response to a subscription request. diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go index 9a60ab1a0d9..e48cdac15bb 100644 --- a/engine/access/rest/websockets/models/subscription_entry.go +++ b/engine/access/rest/websockets/models/subscription_entry.go @@ -2,7 +2,7 @@ package models // SubscriptionEntry represents an active subscription entry. type SubscriptionEntry struct { - SubscriptionID string `json:"subscription_id"` // ID is a client generated UUID for subscription - Topic string `json:"topic"` // Topic of the subscription - Arguments Arguments `json:"arguments"` + Topic string `json:"topic,omitempty"` // Topic of the subscription + ID string `json:"id,omitempty"` // Unique subscription ID + Arguments Arguments `json:"arguments"` // Arguments of the subscription } diff --git a/engine/access/rest/websockets/models/unsubscribe_message.go b/engine/access/rest/websockets/models/unsubscribe_message.go index f72e6cb5c7b..1402189a601 100644 --- a/engine/access/rest/websockets/models/unsubscribe_message.go +++ b/engine/access/rest/websockets/models/unsubscribe_message.go @@ -2,8 +2,8 @@ package models // UnsubscribeMessageRequest represents a request to unsubscribe from a topic. type UnsubscribeMessageRequest struct { - // Note: subscription_id is mandatory for this request BaseMessageRequest + SubscriptionID string `json:"id"` } // UnsubscribeMessageResponse represents the response to an unsubscription request. From bacec0351c291dc68f4554edc58d10db0c34b421 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv <u.andrukhiv@gmail.com> Date: Wed, 15 Jan 2025 14:19:50 +0200 Subject: [PATCH 4/9] Updated godoc --- engine/access/rest/websockets/data_providers/data_provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go index cfa29853908..3dad24a3ff1 100644 --- a/engine/access/rest/websockets/data_providers/data_provider.go +++ b/engine/access/rest/websockets/data_providers/data_provider.go @@ -7,7 +7,7 @@ import ( ) // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector. -// It provides methods for retrieving the provider's unique ID, topic, and a methods to close and run the provider. +// It provides methods for retrieving the provider's unique ID, topic, arguments and a methods to close and run the provider. type DataProvider interface { // ID returns the unique identifier of the data provider. ID() uuid.UUID From b0c5ffa4ab7521dfaa2db958882bdff4f6000b59 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv <u.andrukhiv@gmail.com> Date: Thu, 16 Jan 2025 16:25:57 +0200 Subject: [PATCH 5/9] Removed omitempty from ID and Topic for SubscriptionEntry --- engine/access/rest/websockets/models/subscription_entry.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go index e48cdac15bb..6c88d80b59a 100644 --- a/engine/access/rest/websockets/models/subscription_entry.go +++ b/engine/access/rest/websockets/models/subscription_entry.go @@ -2,7 +2,7 @@ package models // SubscriptionEntry represents an active subscription entry. type SubscriptionEntry struct { - Topic string `json:"topic,omitempty"` // Topic of the subscription - ID string `json:"id,omitempty"` // Unique subscription ID - Arguments Arguments `json:"arguments"` // Arguments of the subscription + Topic string `json:"topic"` // Topic of the subscription + ID string `json:"id"` // Unique subscription ID + Arguments Arguments `json:"arguments"` // Arguments of the subscription } From d2e11f903cf18c0c71cb76dd44b227efea1f1aab Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk <andriyslisarchuk@gmail.com> Date: Thu, 23 Jan 2025 12:27:35 +0200 Subject: [PATCH 6/9] fixed remarks and tests --- engine/access/rest/websockets/controller_test.go | 8 ++++++-- .../access/rest/websockets/models/unsubscribe_message.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go index 60a01632d90..0f8b05ccd1f 100644 --- a/engine/access/rest/websockets/controller_test.go +++ b/engine/access/rest/websockets/controller_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/onflow/flow-go/engine/access/rest/common/parser" + "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/rs/zerolog" @@ -298,7 +300,6 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() { require.True(t, ok) require.Empty(t, response.Error) require.Equal(t, request.SubscriptionID, response.SubscriptionID) - require.Equal(t, request.SubscriptionID, response.SubscriptionID) return websocket.ErrCloseSent }). @@ -470,7 +471,10 @@ func (s *WsControllerSuite) TestListSubscriptions() { done := make(chan struct{}) topic := dp.BlocksTopic - arguments := models.Arguments{} + arguments := models.Arguments{ + "start_block_id": unittest.IdentifierFixture().String(), + "block_status": parser.Finalized, + } dataProvider.On("Topic").Return(topic) dataProvider.On("Arguments").Return(arguments) // data provider might finish on its own or controller will close it via Close() diff --git a/engine/access/rest/websockets/models/unsubscribe_message.go b/engine/access/rest/websockets/models/unsubscribe_message.go index 1402189a601..f72e6cb5c7b 100644 --- a/engine/access/rest/websockets/models/unsubscribe_message.go +++ b/engine/access/rest/websockets/models/unsubscribe_message.go @@ -2,8 +2,8 @@ package models // UnsubscribeMessageRequest represents a request to unsubscribe from a topic. type UnsubscribeMessageRequest struct { + // Note: subscription_id is mandatory for this request BaseMessageRequest - SubscriptionID string `json:"id"` } // UnsubscribeMessageResponse represents the response to an unsubscription request. From 899b691b852ead3155dbd9a85d0ef15cd7238e70 Mon Sep 17 00:00:00 2001 From: Andrii Slisarchuk <andriyslisarchuk@gmail.com> Date: Thu, 23 Jan 2025 12:29:43 +0200 Subject: [PATCH 7/9] remove error_codes.go --- engine/access/rest/websockets/error_codes.go | 10 ---------- 1 file changed, 10 deletions(-) delete mode 100644 engine/access/rest/websockets/error_codes.go diff --git a/engine/access/rest/websockets/error_codes.go b/engine/access/rest/websockets/error_codes.go deleted file mode 100644 index fd206bed0b3..00000000000 --- a/engine/access/rest/websockets/error_codes.go +++ /dev/null @@ -1,10 +0,0 @@ -package websockets - -type Code int - -const ( - InvalidMessage Code = iota - InvalidArgument - NotFound - SubscriptionError -) From 93bb91542ea8c50e6ecb888bff4c18d63b8a1887 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Thu, 23 Jan 2025 09:45:53 -0800 Subject: [PATCH 8/9] Apply suggestions from code review --- engine/access/rest/websockets/models/account_models.go | 2 +- engine/access/rest/websockets/models/subscription_entry.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/access/rest/websockets/models/account_models.go b/engine/access/rest/websockets/models/account_models.go index 7ad7243a96c..bad5e155721 100644 --- a/engine/access/rest/websockets/models/account_models.go +++ b/engine/access/rest/websockets/models/account_models.go @@ -2,7 +2,7 @@ package models // AccountStatusesResponse is the response message for 'events' topic. type AccountStatusesResponse struct { - BlockID string `json:"blockID"` + BlockID string `json:"block_id"` Height string `json:"height"` AccountEvents AccountEvents `json:"account_events"` MessageIndex uint64 `json:"message_index"` diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go index 48566bb090e..4656d9a5544 100644 --- a/engine/access/rest/websockets/models/subscription_entry.go +++ b/engine/access/rest/websockets/models/subscription_entry.go @@ -2,7 +2,7 @@ package models // SubscriptionEntry represents an active subscription entry. type SubscriptionEntry struct { + SubscriptionID string `json:"subscription_id"` // ID is a client generated UUID for subscription Topic string `json:"topic"` // Topic of the subscription - SubscriptionID string `json:"id"` // Unique subscription ID - Arguments Arguments `json:"arguments"` // Arguments of the subscription + Arguments Arguments `json:"arguments"` } From 7413e0281bdc30b827ed927256d028c623ec1436 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Thu, 23 Jan 2025 10:46:08 -0800 Subject: [PATCH 9/9] Update engine/access/rest/websockets/models/subscription_entry.go --- engine/access/rest/websockets/models/subscription_entry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go index 4656d9a5544..9a60ab1a0d9 100644 --- a/engine/access/rest/websockets/models/subscription_entry.go +++ b/engine/access/rest/websockets/models/subscription_entry.go @@ -3,6 +3,6 @@ package models // SubscriptionEntry represents an active subscription entry. type SubscriptionEntry struct { SubscriptionID string `json:"subscription_id"` // ID is a client generated UUID for subscription - Topic string `json:"topic"` // Topic of the subscription + Topic string `json:"topic"` // Topic of the subscription Arguments Arguments `json:"arguments"` }