From cb2fa7dc7a84da84568fa7865e1e388c0f46f2cb Mon Sep 17 00:00:00 2001
From: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Date: Mon, 13 Jan 2025 12:14:58 +0200
Subject: [PATCH 1/9] Added arguments getter for data providers, filled
 ListSubscriptionsMessageResponse with Arguments, updated unit tests

---
 engine/access/rest/websockets/controller.go   |  1 +
 .../account_statuses_provider.go              |  1 +
 .../data_providers/base_provider.go           |  9 ++++++++
 .../data_providers/block_digests_provider.go  |  1 +
 .../data_providers/block_headers_provider.go  |  1 +
 .../data_providers/blocks_provider.go         |  1 +
 .../data_providers/data_provider.go           |  4 ++++
 .../data_providers/events_provider.go         |  1 +
 .../websockets/data_providers/factory_test.go |  1 +
 .../data_providers/mock/data_provider.go      | 21 +++++++++++++++++++
 ...d_and_get_transaction_statuses_provider.go |  1 +
 .../transaction_statuses_provider.go          |  1 +
 12 files changed, 43 insertions(+)

diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go
index 8fe04dd8e87..a58c1f08c4f 100644
--- a/engine/access/rest/websockets/controller.go
+++ b/engine/access/rest/websockets/controller.go
@@ -449,6 +449,7 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
 		subs = append(subs, &models.SubscriptionEntry{
 			SubscriptionID: id.String(),
 			Topic:          provider.Topic(),
+			Arguments:      provider.Arguments(),
 		})
 		return nil
 	})
diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go
index 5b3c112fd09..5600bffea5c 100644
--- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go
@@ -68,6 +68,7 @@ func NewAccountStatusesDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/base_provider.go b/engine/access/rest/websockets/data_providers/base_provider.go
index 8936a883161..71a73e49fc9 100644
--- a/engine/access/rest/websockets/data_providers/base_provider.go
+++ b/engine/access/rest/websockets/data_providers/base_provider.go
@@ -5,6 +5,7 @@ import (
 
 	"github.com/google/uuid"
 
+	"github.com/onflow/flow-go/engine/access/rest/websockets/models"
 	"github.com/onflow/flow-go/engine/access/subscription"
 )
 
@@ -12,6 +13,7 @@ import (
 type baseDataProvider struct {
 	subscriptionID uuid.UUID
 	topic          string
+	arguments      models.Arguments
 	cancel         context.CancelFunc
 	send           chan<- interface{}
 	subscription   subscription.Subscription
@@ -21,6 +23,7 @@ type baseDataProvider struct {
 func newBaseDataProvider(
 	subscriptionID uuid.UUID,
 	topic string,
+	arguments models.Arguments,
 	cancel context.CancelFunc,
 	send chan<- interface{},
 	subscription subscription.Subscription,
@@ -28,6 +31,7 @@ func newBaseDataProvider(
 	return &baseDataProvider{
 		subscriptionID: subscriptionID,
 		topic:          topic,
+		arguments:      arguments,
 		cancel:         cancel,
 		send:           send,
 		subscription:   subscription,
@@ -44,6 +48,11 @@ func (b *baseDataProvider) Topic() string {
 	return b.topic
 }
 
+// Arguments returns the arguments associated with the data provider.
+func (b *baseDataProvider) Arguments() models.Arguments {
+	return b.arguments
+}
+
 // Close terminates the data provider.
 //
 // No errors are expected during normal operations.
diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider.go b/engine/access/rest/websockets/data_providers/block_digests_provider.go
index e00f164972e..6bd68bcabd1 100644
--- a/engine/access/rest/websockets/data_providers/block_digests_provider.go
+++ b/engine/access/rest/websockets/data_providers/block_digests_provider.go
@@ -49,6 +49,7 @@ func NewBlockDigestsDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, blockArgs), // Set up a subscription to block digests based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider.go b/engine/access/rest/websockets/data_providers/block_headers_provider.go
index 582b9cb0924..5a971d801f9 100644
--- a/engine/access/rest/websockets/data_providers/block_headers_provider.go
+++ b/engine/access/rest/websockets/data_providers/block_headers_provider.go
@@ -49,6 +49,7 @@ func NewBlockHeadersDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, blockArgs), // Set up a subscription to block headers based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/blocks_provider.go b/engine/access/rest/websockets/data_providers/blocks_provider.go
index 49c5ee43cd2..15f18b5bd24 100644
--- a/engine/access/rest/websockets/data_providers/blocks_provider.go
+++ b/engine/access/rest/websockets/data_providers/blocks_provider.go
@@ -58,6 +58,7 @@ func NewBlocksDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, blockArgs), // Set up a subscription to blocks based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go
index ed6c11b0f0d..5ebe0c4f20b 100644
--- a/engine/access/rest/websockets/data_providers/data_provider.go
+++ b/engine/access/rest/websockets/data_providers/data_provider.go
@@ -2,6 +2,8 @@ package data_providers
 
 import (
 	"github.com/google/uuid"
+
+	"github.com/onflow/flow-go/engine/access/rest/websockets/models"
 )
 
 // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector.
@@ -11,6 +13,8 @@ type DataProvider interface {
 	ID() uuid.UUID
 	// Topic returns the topic associated with the data provider.
 	Topic() string
+	// Arguments returns the arguments associated with the data provider.
+	Arguments() models.Arguments
 	// Close terminates the data provider.
 	//
 	// No errors are expected during normal operations.
diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go
index 2f22ff6cbaf..2471079716a 100644
--- a/engine/access/rest/websockets/data_providers/events_provider.go
+++ b/engine/access/rest/websockets/data_providers/events_provider.go
@@ -67,6 +67,7 @@ func NewEventsDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, eventArgs), // Set up a subscription to events based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go
index c6dd8fb930f..bb2d4a7a59a 100644
--- a/engine/access/rest/websockets/data_providers/factory_test.go
+++ b/engine/access/rest/websockets/data_providers/factory_test.go
@@ -166,6 +166,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
 			s.Require().NotNil(provider, "Expected provider for topic %s", test.topic)
 			s.Require().NoError(err, "Expected no error for topic %s", test.topic)
 			s.Require().Equal(test.topic, provider.Topic())
+			s.Require().Equal(test.arguments, provider.Arguments())
 
 			test.assertExpectations()
 		})
diff --git a/engine/access/rest/websockets/data_providers/mock/data_provider.go b/engine/access/rest/websockets/data_providers/mock/data_provider.go
index 48debb23ae3..1e954fa89f8 100644
--- a/engine/access/rest/websockets/data_providers/mock/data_provider.go
+++ b/engine/access/rest/websockets/data_providers/mock/data_provider.go
@@ -4,6 +4,7 @@ package mock
 
 import (
 	uuid "github.com/google/uuid"
+	models "github.com/onflow/flow-go/engine/access/rest/websockets/models"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -12,6 +13,26 @@ type DataProvider struct {
 	mock.Mock
 }
 
+// Arguments provides a mock function with given fields:
+func (_m *DataProvider) Arguments() models.Arguments {
+	ret := _m.Called()
+
+	if len(ret) == 0 {
+		panic("no return value specified for Arguments")
+	}
+
+	var r0 models.Arguments
+	if rf, ok := ret.Get(0).(func() models.Arguments); ok {
+		r0 = rf()
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(models.Arguments)
+		}
+	}
+
+	return r0
+}
+
 // Close provides a mock function with given fields:
 func (_m *DataProvider) Close() {
 	_m.Called()
diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
index dceaaf5899e..71c15e85990 100644
--- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
@@ -59,6 +59,7 @@ func NewSendAndGetTransactionStatusesDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, sendTxStatusesArgs), // Set up a subscription to tx statuses based on arguments.
diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
index e18a9d3a1f0..77010e51c09 100644
--- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
@@ -62,6 +62,7 @@ func NewTransactionStatusesDataProvider(
 	p.baseDataProvider = newBaseDataProvider(
 		subscriptionID,
 		topic,
+		arguments,
 		cancel,
 		send,
 		p.createSubscription(subCtx, txStatusesArgs), // Set up a subscription to tx statuses based on arguments.

From acb6b02204541a8f67fed9131f827a42511f5460 Mon Sep 17 00:00:00 2001
From: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Date: Mon, 13 Jan 2025 13:23:23 +0200
Subject: [PATCH 2/9] Updatated TestListSubscriptions unit test

---
 engine/access/rest/websockets/controller_test.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go
index ace74b8c8e1..580d64ea8a6 100644
--- a/engine/access/rest/websockets/controller_test.go
+++ b/engine/access/rest/websockets/controller_test.go
@@ -463,7 +463,9 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 		done := make(chan struct{})
 
 		topic := dp.BlocksTopic
+		arguments := models.Arguments{}
 		dataProvider.On("Topic").Return(topic)
+		dataProvider.On("Arguments").Return(arguments)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -509,6 +511,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 				require.Equal(t, 1, len(response.Subscriptions))
 				require.Equal(t, subscriptionID, response.Subscriptions[0].SubscriptionID)
 				require.Equal(t, topic, response.Subscriptions[0].Topic)
+				require.Equal(t, arguments, response.Subscriptions[0].Arguments)
 
 				return websocket.ErrCloseSent
 			}).

From 0e3d1cfebfac08fb2ddfe6f3f8cff4a15c0edd47 Mon Sep 17 00:00:00 2001
From: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Date: Wed, 15 Jan 2025 14:17:24 +0200
Subject: [PATCH 3/9] Reverted changes from
 illia-malachyn/6845-unify-subscription-and-message-id to avoid pendency on it

---
 engine/access/rest/websockets/controller.go   |  98 +++++------
 .../access/rest/websockets/controller_test.go | 161 +++++++++++-------
 .../account_statuses_provider.go              |   3 -
 .../account_statuses_provider_test.go         |  11 +-
 .../data_providers/base_provider.go           |  29 ++--
 .../data_providers/block_digests_provider.go  |   3 -
 .../block_digests_provider_test.go            |   3 +-
 .../data_providers/block_headers_provider.go  |   3 -
 .../block_headers_provider_test.go            |   3 +-
 .../data_providers/blocks_provider.go         |   3 -
 .../data_providers/blocks_provider_test.go    |   3 +-
 .../data_providers/data_provider.go           |   2 +-
 .../data_providers/events_provider.go         |   3 -
 .../data_providers/events_provider_test.go    |  11 +-
 .../rest/websockets/data_providers/factory.go |  18 +-
 .../websockets/data_providers/factory_test.go |   5 +-
 .../mock/data_provider_factory.go             |  20 +--
 ...d_and_get_transaction_statuses_provider.go |   3 -
 ..._get_transaction_statuses_provider_test.go |   2 -
 .../transaction_statuses_provider.go          |   3 -
 .../transaction_statuses_provider_test.go     |   3 -
 .../websockets/data_providers/unit_test.go    |   8 +-
 engine/access/rest/websockets/error_codes.go  |  10 ++
 .../rest/websockets/models/account_models.go  |   2 +-
 .../rest/websockets/models/base_message.go    |  17 +-
 .../rest/websockets/models/error_message.go   |   7 +
 .../websockets/models/list_subscriptions.go   |   6 +-
 .../websockets/models/subscribe_message.go    |   2 +-
 .../websockets/models/subscription_entry.go   |   6 +-
 .../websockets/models/unsubscribe_message.go  |   2 +-
 30 files changed, 206 insertions(+), 244 deletions(-)
 create mode 100644 engine/access/rest/websockets/error_codes.go
 create mode 100644 engine/access/rest/websockets/models/error_message.go

diff --git a/engine/access/rest/websockets/controller.go b/engine/access/rest/websockets/controller.go
index 75f8b764f8e..26c1c977cbf 100644
--- a/engine/access/rest/websockets/controller.go
+++ b/engine/access/rest/websockets/controller.go
@@ -77,7 +77,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"net/http"
 	"sync"
 	"time"
 
@@ -247,7 +246,7 @@ func (c *Controller) keepalive(ctx context.Context) error {
 // If no messages are sent within InactivityTimeout and no active data providers exist,
 // the connection will be closed.
 func (c *Controller) writeMessages(ctx context.Context) error {
-	inactivityTicker := time.NewTicker(c.inactivityTickerPeriod())
+	inactivityTicker := time.NewTicker(c.config.InactivityTimeout / 10)
 	defer inactivityTicker.Stop()
 
 	lastMessageSentAt := time.Now()
@@ -302,10 +301,6 @@ func (c *Controller) writeMessages(ctx context.Context) error {
 	}
 }
 
-func (c *Controller) inactivityTickerPeriod() time.Duration {
-	return c.config.InactivityTimeout / 10
-}
-
 // readMessages continuously reads messages from a client WebSocket connection,
 // validates each message, and processes it based on the message type.
 func (c *Controller) readMessages(ctx context.Context) error {
@@ -319,8 +314,7 @@ func (c *Controller) readMessages(ctx context.Context) error {
 			c.writeErrorResponse(
 				ctx,
 				err,
-				wrapErrorMessage(http.StatusBadRequest, "error reading message", ""),
-			)
+				wrapErrorMessage(InvalidMessage, "error reading message", "", "", ""))
 			continue
 		}
 
@@ -329,8 +323,7 @@ func (c *Controller) readMessages(ctx context.Context) error {
 			c.writeErrorResponse(
 				ctx,
 				err,
-				wrapErrorMessage(http.StatusBadRequest, "error parsing message", ""),
-			)
+				wrapErrorMessage(InvalidMessage, "error parsing message", "", "", ""))
 			continue
 		}
 	}
@@ -373,32 +366,24 @@ func (c *Controller) handleMessage(ctx context.Context, message json.RawMessage)
 }
 
 func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMessageRequest) {
-	subscriptionID, err := c.parseOrCreateSubscriptionID(msg.SubscriptionID)
-	if err != nil {
-		c.writeErrorResponse(
-			ctx,
-			err,
-			wrapErrorMessage(http.StatusBadRequest, "error parsing subscription id", msg.SubscriptionID),
-		)
-		return
-	}
-
 	// register new provider
-	provider, err := c.dataProviderFactory.NewDataProvider(ctx, subscriptionID, msg.Topic, msg.Arguments, c.multiplexedStream)
+	provider, err := c.dataProviderFactory.NewDataProvider(ctx, msg.Topic, msg.Arguments, c.multiplexedStream)
 	if err != nil {
 		c.writeErrorResponse(
 			ctx,
 			err,
-			wrapErrorMessage(http.StatusBadRequest, "error creating data provider", subscriptionID.String()),
+			wrapErrorMessage(InvalidArgument, "error creating data provider", msg.ClientMessageID, models.SubscribeAction, ""),
 		)
 		return
 	}
-	c.dataProviders.Add(subscriptionID, provider)
+	c.dataProviders.Add(provider.ID(), provider)
 
 	// write OK response to client
 	responseOk := models.SubscribeMessageResponse{
 		BaseMessageResponse: models.BaseMessageResponse{
-			SubscriptionID: subscriptionID.String(),
+			ClientMessageID: msg.ClientMessageID,
+			Success:         true,
+			SubscriptionID:  provider.ID().String(),
 		},
 	}
 	c.writeResponse(ctx, responseOk)
@@ -411,65 +396,73 @@ func (c *Controller) handleSubscribe(ctx context.Context, msg models.SubscribeMe
 			c.writeErrorResponse(
 				ctx,
 				err,
-				wrapErrorMessage(http.StatusInternalServerError, "internal error", subscriptionID.String()),
+				wrapErrorMessage(SubscriptionError, "subscription finished with error", "", "", ""),
 			)
 		}
 
 		c.dataProvidersGroup.Done()
-		c.dataProviders.Remove(subscriptionID)
+		c.dataProviders.Remove(provider.ID())
 	}()
 }
 
 func (c *Controller) handleUnsubscribe(ctx context.Context, msg models.UnsubscribeMessageRequest) {
-	subscriptionID, err := uuid.Parse(msg.SubscriptionID)
+	id, err := uuid.Parse(msg.SubscriptionID)
 	if err != nil {
 		c.writeErrorResponse(
 			ctx,
 			err,
-			wrapErrorMessage(http.StatusBadRequest, "error parsing subscription id", msg.SubscriptionID),
+			wrapErrorMessage(InvalidArgument, "error parsing subscription ID", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
 		)
 		return
 	}
 
-	provider, ok := c.dataProviders.Get(subscriptionID)
+	provider, ok := c.dataProviders.Get(id)
 	if !ok {
 		c.writeErrorResponse(
 			ctx,
 			err,
-			wrapErrorMessage(http.StatusNotFound, "subscription not found", subscriptionID.String()),
+			wrapErrorMessage(NotFound, "subscription not found", msg.ClientMessageID, models.UnsubscribeAction, msg.SubscriptionID),
 		)
 		return
 	}
 
 	provider.Close()
-	c.dataProviders.Remove(subscriptionID)
+	c.dataProviders.Remove(id)
 
 	responseOk := models.UnsubscribeMessageResponse{
 		BaseMessageResponse: models.BaseMessageResponse{
-			SubscriptionID: subscriptionID.String(),
+			ClientMessageID: msg.ClientMessageID,
+			Success:         true,
+			SubscriptionID:  msg.SubscriptionID,
 		},
 	}
 	c.writeResponse(ctx, responseOk)
 }
 
-func (c *Controller) handleListSubscriptions(ctx context.Context, _ models.ListSubscriptionsMessageRequest) {
+func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.ListSubscriptionsMessageRequest) {
 	var subs []*models.SubscriptionEntry
 	err := c.dataProviders.ForEach(func(id uuid.UUID, provider dp.DataProvider) error {
 		subs = append(subs, &models.SubscriptionEntry{
-			SubscriptionID: id.String(),
-			Topic:          provider.Topic(),
-			Arguments:      provider.Arguments(),
+			ID:        id.String(),
+			Topic:     provider.Topic(),
+			Arguments: provider.Arguments(),
 		})
 		return nil
 	})
 
-	// intentionally ignored, this never happens
 	if err != nil {
-		c.logger.Debug().Err(err).Msg("error listing subscriptions")
+		c.writeErrorResponse(
+			ctx,
+			err,
+			wrapErrorMessage(NotFound, "error listing subscriptions", msg.ClientMessageID, models.ListSubscriptionsAction, ""),
+		)
+		return
 	}
 
 	responseOk := models.ListSubscriptionsMessageResponse{
-		Subscriptions: subs,
+		Success:         true,
+		ClientMessageID: msg.ClientMessageID,
+		Subscriptions:   subs,
 	}
 	c.writeResponse(ctx, responseOk)
 }
@@ -506,30 +499,15 @@ func (c *Controller) writeResponse(ctx context.Context, response interface{}) {
 	}
 }
 
-func wrapErrorMessage(code int, message string, subscriptionID string) models.BaseMessageResponse {
+func wrapErrorMessage(code Code, message string, msgId string, action string, subscriptionID string) models.BaseMessageResponse {
 	return models.BaseMessageResponse{
-		SubscriptionID: subscriptionID,
+		ClientMessageID: msgId,
+		Success:         false,
+		SubscriptionID:  subscriptionID,
 		Error: models.ErrorMessage{
-			Code:    code,
+			Code:    int(code),
 			Message: message,
+			Action:  action,
 		},
 	}
 }
-
-func (c *Controller) parseOrCreateSubscriptionID(id string) (uuid.UUID, error) {
-	// if client didn't provide subscription id, we create one for them
-	if id == "" {
-		return uuid.New(), nil
-	}
-
-	newID, err := uuid.Parse(id)
-	if err != nil {
-		return uuid.Nil, err
-	}
-
-	if c.dataProviders.Has(newID) {
-		return uuid.Nil, fmt.Errorf("subscription id is already in use")
-	}
-
-	return newID, nil
-}
diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go
index c6a9650cc90..6e0c618879f 100644
--- a/engine/access/rest/websockets/controller_test.go
+++ b/engine/access/rest/websockets/controller_test.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"net/http"
 	"testing"
 	"time"
 
@@ -52,11 +51,14 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
 		done := make(chan struct{})
+
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -69,8 +71,8 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 
 		request := models.SubscribeMessageRequest{
 			BaseMessageRequest: models.BaseMessageRequest{
-				SubscriptionID: uuid.New().String(),
-				Action:         models.SubscribeAction,
+				ClientMessageID: uuid.New().String(),
+				Action:          models.SubscribeAction,
 			},
 			Topic:     dp.BlocksTopic,
 			Arguments: nil,
@@ -96,7 +98,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 
 				response, ok := msg.(models.SubscribeMessageResponse)
 				require.True(t, ok)
-				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
+				require.True(t, response.Success)
+				require.Equal(t, request.ClientMessageID, response.ClientMessageID)
+				require.Equal(t, id.String(), response.SubscriptionID)
 
 				return websocket.ErrCloseSent
 			})
@@ -144,8 +148,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 
 				response, ok := msg.(models.BaseMessageResponse)
 				require.True(t, ok)
+				require.False(t, response.Success)
 				require.NotEmpty(t, response.Error)
-				require.Equal(t, http.StatusBadRequest, response.Error.Code)
+				require.Equal(t, int(InvalidMessage), response.Error.Code)
 				return websocket.ErrCloseSent
 			})
 
@@ -164,13 +169,12 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(nil, fmt.Errorf("error creating data provider")).
 			Once()
 
 		done := make(chan struct{})
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
+		s.expectSubscribeRequest(t, conn)
 
 		conn.
 			On("WriteJSON", mock.Anything).
@@ -179,8 +183,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 
 				response, ok := msg.(models.BaseMessageResponse)
 				require.True(t, ok)
+				require.False(t, response.Success)
 				require.NotEmpty(t, response.Error)
-				require.Equal(t, http.StatusBadRequest, response.Error.Code)
+				require.Equal(t, int(InvalidArgument), response.Error.Code)
 
 				return websocket.ErrCloseSent
 			})
@@ -199,6 +204,7 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 		conn, dataProviderFactory, dataProvider := newControllerMocks(t)
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
+		dataProvider.On("ID").Return(uuid.New())
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -208,14 +214,13 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 			Once()
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
 		done := make(chan struct{})
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		conn.
 			On("WriteJSON", mock.Anything).
@@ -224,8 +229,9 @@ func (s *WsControllerSuite) TestSubscribeRequest() {
 
 				response, ok := msg.(models.BaseMessageResponse)
 				require.True(t, ok)
+				require.False(t, response.Success)
 				require.NotEmpty(t, response.Error)
-				require.Equal(t, http.StatusInternalServerError, response.Error.Code)
+				require.Equal(t, int(SubscriptionError), response.Error.Code)
 
 				return websocket.ErrCloseSent
 			})
@@ -248,11 +254,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
 		done := make(chan struct{})
+
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -263,15 +272,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 			Return(nil).
 			Once()
 
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		request := models.UnsubscribeMessageRequest{
 			BaseMessageRequest: models.BaseMessageRequest{
-				SubscriptionID: subscriptionID,
-				Action:         models.UnsubscribeAction,
+				ClientMessageID: uuid.New().String(),
+				Action:          models.UnsubscribeAction,
 			},
+			SubscriptionID: id.String(),
 		}
 		requestJson, err := json.Marshal(request)
 		require.NoError(t, err)
@@ -293,8 +302,9 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 
 				response, ok := msg.(models.UnsubscribeMessageResponse)
 				require.True(t, ok)
+				require.True(t, response.Success)
 				require.Empty(t, response.Error)
-				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
+				require.Equal(t, request.ClientMessageID, response.ClientMessageID)
 				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
 
 				return websocket.ErrCloseSent
@@ -317,11 +327,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
 		done := make(chan struct{})
+
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -332,15 +345,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 			Return(nil).
 			Once()
 
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		request := models.UnsubscribeMessageRequest{
 			BaseMessageRequest: models.BaseMessageRequest{
-				SubscriptionID: "invalid-uuid",
-				Action:         models.UnsubscribeAction,
+				ClientMessageID: uuid.New().String(),
+				Action:          models.UnsubscribeAction,
 			},
+			SubscriptionID: "invalid-uuid",
 		}
 		requestJson, err := json.Marshal(request)
 		require.NoError(t, err)
@@ -362,9 +375,10 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 
 				response, ok := msg.(models.BaseMessageResponse)
 				require.True(t, ok)
+				require.False(t, response.Success)
 				require.NotEmpty(t, response.Error)
-				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
-				require.Equal(t, http.StatusBadRequest, response.Error.Code)
+				require.Equal(t, request.ClientMessageID, response.ClientMessageID)
+				require.Equal(t, int(InvalidArgument), response.Error.Code)
 
 				return websocket.ErrCloseSent
 			}).
@@ -386,11 +400,14 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
 		done := make(chan struct{})
+
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 		dataProvider.
@@ -401,15 +418,15 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 			Return(nil).
 			Once()
 
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		request := models.UnsubscribeMessageRequest{
 			BaseMessageRequest: models.BaseMessageRequest{
-				SubscriptionID: uuid.New().String(), // unknown subscription id
-				Action:         models.UnsubscribeAction,
+				ClientMessageID: uuid.New().String(),
+				Action:          models.UnsubscribeAction,
 			},
+			SubscriptionID: uuid.New().String(),
 		}
 		requestJson, err := json.Marshal(request)
 		require.NoError(t, err)
@@ -431,10 +448,11 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 
 				response, ok := msg.(models.BaseMessageResponse)
 				require.True(t, ok)
-				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
-
+				require.False(t, response.Success)
 				require.NotEmpty(t, response.Error)
-				require.Equal(t, http.StatusNotFound, response.Error.Code)
+
+				require.Equal(t, request.ClientMessageID, response.ClientMessageID)
+				require.Equal(t, int(NotFound), response.Error.Code)
 
 				return websocket.ErrCloseSent
 			}).
@@ -457,14 +475,16 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
 		done := make(chan struct{})
 
+		id := uuid.New()
 		topic := dp.BlocksTopic
 		arguments := models.Arguments{}
+		dataProvider.On("ID").Return(id)
 		dataProvider.On("Topic").Return(topic)
 		dataProvider.On("Arguments").Return(arguments)
 		// data provider might finish on its own or controller will close it via Close()
@@ -477,14 +497,13 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 			Return(nil).
 			Once()
 
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		request := models.ListSubscriptionsMessageRequest{
 			BaseMessageRequest: models.BaseMessageRequest{
-				SubscriptionID: "",
-				Action:         models.ListSubscriptionsAction,
+				ClientMessageID: uuid.New().String(),
+				Action:          models.ListSubscriptionsAction,
 			},
 		}
 		requestJson, err := json.Marshal(request)
@@ -507,8 +526,11 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 
 				response, ok := msg.(models.ListSubscriptionsMessageResponse)
 				require.True(t, ok)
+				require.True(t, response.Success)
+				require.Empty(t, response.Error)
+				require.Equal(t, request.ClientMessageID, response.ClientMessageID)
 				require.Equal(t, 1, len(response.Subscriptions))
-				require.Equal(t, subscriptionID, response.Subscriptions[0].SubscriptionID)
+				require.Equal(t, id.String(), response.Subscriptions[0].ID)
 				require.Equal(t, topic, response.Subscriptions[0].Topic)
 				require.Equal(t, arguments, response.Subscriptions[0].Arguments)
 
@@ -535,10 +557,12 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 
@@ -553,9 +577,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
 			Once()
 
 		done := make(chan struct{})
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		// Expect a valid block to be passed to WriteJSON.
 		// If we got to this point, the controller executed all its logic properly
@@ -589,10 +612,12 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 
@@ -609,9 +634,8 @@ func (s *WsControllerSuite) TestSubscribeBlocks() {
 			Once()
 
 		done := make(chan struct{})
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		i := 0
 		actualBlocks := make([]*flow.Block, len(expectedBlocks))
@@ -793,10 +817,12 @@ func (s *WsControllerSuite) TestControllerShutdown() {
 		controller := NewWebSocketController(s.logger, s.wsConfig, conn, dataProviderFactory)
 
 		dataProviderFactory.
-			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
+			On("NewDataProvider", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
 			Return(dataProvider, nil).
 			Once()
 
+		id := uuid.New()
+		dataProvider.On("ID").Return(id)
 		// data provider might finish on its own or controller will close it via Close()
 		dataProvider.On("Close").Return(nil).Maybe()
 
@@ -809,9 +835,8 @@ func (s *WsControllerSuite) TestControllerShutdown() {
 			Once()
 
 		done := make(chan struct{})
-		subscriptionID := uuid.New().String()
-		s.expectSubscribeRequest(t, conn, subscriptionID)
-		s.expectSubscribeResponse(t, conn, subscriptionID)
+		msgID := s.expectSubscribeRequest(t, conn)
+		s.expectSubscribeResponse(t, conn, msgID)
 
 		conn.
 			On("WriteJSON", mock.Anything).
@@ -871,14 +896,15 @@ func (s *WsControllerSuite) TestControllerShutdown() {
 		conn.
 			On("ReadJSON", mock.Anything).
 			Return(func(interface{}) error {
-				// make sure the reader routine sleeps for more time than InactivityTimeout + inactivity ticker period.
-				// meanwhile, the writer routine must shut down the controller.
-				<-time.After(wsConfig.InactivityTimeout + controller.inactivityTickerPeriod()*2)
+				// waiting more than InactivityTimeout to make sure that read message routine busy and do not return
+				// an error before than inactivity tracker initiate shut down
+				<-time.After(wsConfig.InactivityTimeout)
 				return websocket.ErrCloseSent
 			}).
 			Once()
 
 		controller.HandleConnection(context.Background())
+		time.Sleep(wsConfig.InactivityTimeout)
 
 		conn.AssertExpectations(t)
 	})
@@ -970,11 +996,11 @@ func newControllerMocks(t *testing.T) (*connmock.WebsocketConnection, *dpmock.Da
 }
 
 // expectSubscribeRequest mocks the client's subscription request.
-func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock.WebsocketConnection, subscriptionID string) {
+func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock.WebsocketConnection) string {
 	request := models.SubscribeMessageRequest{
 		BaseMessageRequest: models.BaseMessageRequest{
-			SubscriptionID: subscriptionID,
-			Action:         models.SubscribeAction,
+			ClientMessageID: uuid.New().String(),
+			Action:          models.SubscribeAction,
 		},
 		Topic: dp.BlocksTopic,
 	}
@@ -991,16 +1017,19 @@ func (s *WsControllerSuite) expectSubscribeRequest(t *testing.T, conn *connmock.
 		}).
 		Return(nil).
 		Once()
+
+	return request.ClientMessageID
 }
 
 // expectSubscribeResponse mocks the subscription response sent to the client.
-func (s *WsControllerSuite) expectSubscribeResponse(t *testing.T, conn *connmock.WebsocketConnection, subscriptionID string) {
+func (s *WsControllerSuite) expectSubscribeResponse(t *testing.T, conn *connmock.WebsocketConnection, msgId string) {
 	conn.
 		On("WriteJSON", mock.Anything).
 		Run(func(args mock.Arguments) {
 			response, ok := args.Get(0).(models.SubscribeMessageResponse)
 			require.True(t, ok)
-			require.Equal(t, subscriptionID, response.SubscriptionID)
+			require.Equal(t, msgId, response.ClientMessageID)
+			require.Equal(t, true, response.Success)
 		}).
 		Return(nil).
 		Once()
diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider.go b/engine/access/rest/websockets/data_providers/account_statuses_provider.go
index 3615fc297fd..d2b368a33b0 100644
--- a/engine/access/rest/websockets/data_providers/account_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/account_statuses_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -42,7 +41,6 @@ func NewAccountStatusesDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	stateStreamApi state_stream.API,
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	send chan<- interface{},
@@ -65,7 +63,6 @@ func NewAccountStatusesDataProvider(
 	subCtx, cancel := context.WithCancel(ctx)
 
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go
index 87b177d33f4..157c50e7deb 100644
--- a/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/account_statuses_provider_test.go
@@ -4,9 +4,7 @@ import (
 	"context"
 	"strconv"
 	"testing"
-	"time"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
@@ -178,7 +176,6 @@ func (s *AccountStatusesProviderSuite) TestAccountStatusesDataProvider_InvalidAr
 				ctx,
 				s.log,
 				s.api,
-				uuid.New(),
 				topic,
 				test.arguments,
 				send,
@@ -206,7 +203,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
 	// Create a mock subscription and mock the channel
 	sub := ssmock.NewSubscription(s.T())
 	sub.On("Channel").Return((<-chan interface{})(accountStatusesChan))
-	sub.On("Err").Return(nil).Once()
+	sub.On("Err").Return(nil)
 
 	s.api.On("SubscribeAccountStatusesFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub)
 
@@ -220,7 +217,6 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
 		ctx,
 		s.log,
 		s.api,
-		uuid.New(),
 		topic,
 		arguments,
 		send,
@@ -235,9 +231,7 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
 	defer provider.Close()
 
 	// Run the provider in a separate goroutine to simulate subscription processing
-	done := make(chan struct{})
 	go func() {
-		defer close(done)
 		err = provider.Run()
 		s.Require().NoError(err)
 	}()
@@ -260,9 +254,6 @@ func (s *AccountStatusesProviderSuite) TestMessageIndexAccountStatusesProviderRe
 		responses = append(responses, accountStatusesRes)
 	}
 
-	// Wait for the provider goroutine to finish
-	unittest.RequireCloseBefore(s.T(), done, time.Second, "provider failed to stop")
-
 	// Verifying that indices are starting from 0
 	s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0")
 
diff --git a/engine/access/rest/websockets/data_providers/base_provider.go b/engine/access/rest/websockets/data_providers/base_provider.go
index 71a73e49fc9..9387785a2d6 100644
--- a/engine/access/rest/websockets/data_providers/base_provider.go
+++ b/engine/access/rest/websockets/data_providers/base_provider.go
@@ -11,17 +11,16 @@ import (
 
 // baseDataProvider holds common objects for the provider
 type baseDataProvider struct {
-	subscriptionID uuid.UUID
-	topic          string
-	arguments      models.Arguments
-	cancel         context.CancelFunc
-	send           chan<- interface{}
-	subscription   subscription.Subscription
+	id           uuid.UUID
+	topic        string
+	arguments    models.Arguments
+	cancel       context.CancelFunc
+	send         chan<- interface{}
+	subscription subscription.Subscription
 }
 
 // newBaseDataProvider creates a new instance of baseDataProvider.
 func newBaseDataProvider(
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	cancel context.CancelFunc,
@@ -29,18 +28,18 @@ func newBaseDataProvider(
 	subscription subscription.Subscription,
 ) *baseDataProvider {
 	return &baseDataProvider{
-		subscriptionID: subscriptionID,
-		topic:          topic,
-		arguments:      arguments,
-		cancel:         cancel,
-		send:           send,
-		subscription:   subscription,
+		id:           uuid.New(),
+		topic:        topic,
+		arguments:    arguments,
+		cancel:       cancel,
+		send:         send,
+		subscription: subscription,
 	}
 }
 
-// ID returns the subscription ID associated with current data provider
+// ID returns the unique identifier of the data provider.
 func (b *baseDataProvider) ID() uuid.UUID {
-	return b.subscriptionID
+	return b.id
 }
 
 // Topic returns the topic associated with the data provider.
diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider.go b/engine/access/rest/websockets/data_providers/block_digests_provider.go
index 144f70751c2..8e08d9b95e1 100644
--- a/engine/access/rest/websockets/data_providers/block_digests_provider.go
+++ b/engine/access/rest/websockets/data_providers/block_digests_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 
 	"github.com/onflow/flow-go/access"
@@ -29,7 +28,6 @@ func NewBlockDigestsDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	api access.API,
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	send chan<- interface{},
@@ -47,7 +45,6 @@ func NewBlockDigestsDataProvider(
 
 	subCtx, cancel := context.WithCancel(ctx)
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/block_digests_provider_test.go b/engine/access/rest/websockets/data_providers/block_digests_provider_test.go
index 6bc18ee0650..57576fe60a0 100644
--- a/engine/access/rest/websockets/data_providers/block_digests_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/block_digests_provider_test.go
@@ -5,7 +5,6 @@ import (
 	"strconv"
 	"testing"
 
-	"github.com/google/uuid"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"github.com/stretchr/testify/suite"
@@ -44,7 +43,7 @@ func (s *BlockDigestsProviderSuite) TestBlockDigestsDataProvider_InvalidArgument
 
 	for _, test := range s.invalidArgumentsTestCases() {
 		s.Run(test.name, func() {
-			provider, err := NewBlockDigestsDataProvider(ctx, s.log, s.api, uuid.New(), topic, test.arguments, send)
+			provider, err := NewBlockDigestsDataProvider(ctx, s.log, s.api, topic, test.arguments, send)
 			s.Require().Nil(provider)
 			s.Require().Error(err)
 			s.Require().Contains(err.Error(), test.expectedErrorMsg)
diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider.go b/engine/access/rest/websockets/data_providers/block_headers_provider.go
index fe378c9e2ae..a2e74c1ad2a 100644
--- a/engine/access/rest/websockets/data_providers/block_headers_provider.go
+++ b/engine/access/rest/websockets/data_providers/block_headers_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 
 	"github.com/onflow/flow-go/access"
@@ -30,7 +29,6 @@ func NewBlockHeadersDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	api access.API,
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	send chan<- interface{},
@@ -48,7 +46,6 @@ func NewBlockHeadersDataProvider(
 
 	subCtx, cancel := context.WithCancel(ctx)
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/block_headers_provider_test.go b/engine/access/rest/websockets/data_providers/block_headers_provider_test.go
index f2b844117eb..9f71ae124f2 100644
--- a/engine/access/rest/websockets/data_providers/block_headers_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/block_headers_provider_test.go
@@ -5,7 +5,6 @@ import (
 	"strconv"
 	"testing"
 
-	"github.com/google/uuid"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"github.com/stretchr/testify/suite"
@@ -45,7 +44,7 @@ func (s *BlockHeadersProviderSuite) TestBlockHeadersDataProvider_InvalidArgument
 
 	for _, test := range s.invalidArgumentsTestCases() {
 		s.Run(test.name, func() {
-			provider, err := NewBlockHeadersDataProvider(ctx, s.log, s.api, uuid.New(), topic, test.arguments, send)
+			provider, err := NewBlockHeadersDataProvider(ctx, s.log, s.api, topic, test.arguments, send)
 			s.Require().Nil(provider)
 			s.Require().Error(err)
 			s.Require().Contains(err.Error(), test.expectedErrorMsg)
diff --git a/engine/access/rest/websockets/data_providers/blocks_provider.go b/engine/access/rest/websockets/data_providers/blocks_provider.go
index bc98b03a314..b8fc0cfbf99 100644
--- a/engine/access/rest/websockets/data_providers/blocks_provider.go
+++ b/engine/access/rest/websockets/data_providers/blocks_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 
 	"github.com/onflow/flow-go/access"
@@ -41,7 +40,6 @@ func NewBlocksDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	api access.API,
-	subscriptionID uuid.UUID,
 	linkGenerator commonmodels.LinkGenerator,
 	topic string,
 	arguments models.Arguments,
@@ -62,7 +60,6 @@ func NewBlocksDataProvider(
 
 	subCtx, cancel := context.WithCancel(ctx)
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/blocks_provider_test.go b/engine/access/rest/websockets/data_providers/blocks_provider_test.go
index d75e7fb32aa..e73dc99df1d 100644
--- a/engine/access/rest/websockets/data_providers/blocks_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/blocks_provider_test.go
@@ -6,7 +6,6 @@ import (
 	"strconv"
 	"testing"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
@@ -132,7 +131,7 @@ func (s *BlocksProviderSuite) TestBlocksDataProvider_InvalidArguments() {
 
 	for _, test := range s.invalidArgumentsTestCases() {
 		s.Run(test.name, func() {
-			provider, err := NewBlocksDataProvider(ctx, s.log, s.api, uuid.New(), nil, BlocksTopic, test.arguments, send)
+			provider, err := NewBlocksDataProvider(ctx, s.log, s.api, nil, BlocksTopic, test.arguments, send)
 			s.Require().Nil(provider)
 			s.Require().Error(err)
 			s.Require().Contains(err.Error(), test.expectedErrorMsg)
diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go
index 5ebe0c4f20b..cfa29853908 100644
--- a/engine/access/rest/websockets/data_providers/data_provider.go
+++ b/engine/access/rest/websockets/data_providers/data_provider.go
@@ -7,7 +7,7 @@ import (
 )
 
 // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector.
-// It provides methods for retrieving the provider's unique SubscriptionID, topic, and a methods to close and run the provider.
+// It provides methods for retrieving the provider's unique ID, topic, and a methods to close and run the provider.
 type DataProvider interface {
 	// ID returns the unique identifier of the data provider.
 	ID() uuid.UUID
diff --git a/engine/access/rest/websockets/data_providers/events_provider.go b/engine/access/rest/websockets/data_providers/events_provider.go
index 9a9ce8c5edf..aa08ad71b87 100644
--- a/engine/access/rest/websockets/data_providers/events_provider.go
+++ b/engine/access/rest/websockets/data_providers/events_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 
 	"github.com/onflow/flow-go/engine/access/rest/common/parser"
@@ -41,7 +40,6 @@ func NewEventsDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	stateStreamApi state_stream.API,
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	send chan<- interface{},
@@ -64,7 +62,6 @@ func NewEventsDataProvider(
 	subCtx, cancel := context.WithCancel(ctx)
 
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/events_provider_test.go b/engine/access/rest/websockets/data_providers/events_provider_test.go
index 4089f23ea8c..ab9f6110820 100644
--- a/engine/access/rest/websockets/data_providers/events_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/events_provider_test.go
@@ -5,9 +5,7 @@ import (
 	"fmt"
 	"strconv"
 	"testing"
-	"time"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
@@ -204,7 +202,6 @@ func (s *EventsProviderSuite) TestEventsDataProvider_InvalidArguments() {
 				ctx,
 				s.log,
 				s.api,
-				uuid.New(),
 				topic,
 				test.arguments,
 				send,
@@ -232,7 +229,7 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
 	// Create a mock subscription and mock the channel
 	sub := ssmock.NewSubscription(s.T())
 	sub.On("Channel").Return((<-chan interface{})(eventChan))
-	sub.On("Err").Return(nil).Once()
+	sub.On("Err").Return(nil)
 
 	s.api.On("SubscribeEventsFromStartBlockID", mock.Anything, mock.Anything, mock.Anything).Return(sub)
 
@@ -246,7 +243,6 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
 		ctx,
 		s.log,
 		s.api,
-		uuid.New(),
 		topic,
 		arguments,
 		send,
@@ -262,9 +258,7 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
 	defer provider.Close()
 
 	// Run the provider in a separate goroutine to simulate subscription processing
-	done := make(chan struct{})
 	go func() {
-		defer close(done)
 		err = provider.Run()
 		s.Require().NoError(err)
 	}()
@@ -289,9 +283,6 @@ func (s *EventsProviderSuite) TestMessageIndexEventProviderResponse_HappyPath()
 		responses = append(responses, eventRes)
 	}
 
-	// Wait for the provider goroutine to finish
-	unittest.RequireCloseBefore(s.T(), done, time.Second, "provider failed to stop")
-
 	// Verifying that indices are starting from 1
 	s.Require().Equal(uint64(0), responses[0].MessageIndex, "Expected MessageIndex to start with 0")
 
diff --git a/engine/access/rest/websockets/data_providers/factory.go b/engine/access/rest/websockets/data_providers/factory.go
index 38f5fda6da1..cb70b8bcb79 100644
--- a/engine/access/rest/websockets/data_providers/factory.go
+++ b/engine/access/rest/websockets/data_providers/factory.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 
 	"github.com/onflow/flow-go/access"
@@ -34,7 +33,7 @@ type DataProviderFactory interface {
 	// and configuration parameters.
 	//
 	// No errors are expected during normal operations.
-	NewDataProvider(ctx context.Context, subscriptionID uuid.UUID, topic string, args models.Arguments, ch chan<- interface{}) (DataProvider, error)
+	NewDataProvider(ctx context.Context, topic string, args models.Arguments, ch chan<- interface{}) (DataProvider, error)
 }
 
 var _ DataProviderFactory = (*DataProviderFactoryImpl)(nil)
@@ -94,26 +93,25 @@ func NewDataProviderFactory(
 // No errors are expected during normal operations.
 func (s *DataProviderFactoryImpl) NewDataProvider(
 	ctx context.Context,
-	subscriptionID uuid.UUID,
 	topic string,
 	arguments models.Arguments,
 	ch chan<- interface{},
 ) (DataProvider, error) {
 	switch topic {
 	case BlocksTopic:
-		return NewBlocksDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch)
+		return NewBlocksDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch)
 	case BlockHeadersTopic:
-		return NewBlockHeadersDataProvider(ctx, s.logger, s.accessApi, subscriptionID, topic, arguments, ch)
+		return NewBlockHeadersDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
 	case BlockDigestsTopic:
-		return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, subscriptionID, topic, arguments, ch)
+		return NewBlockDigestsDataProvider(ctx, s.logger, s.accessApi, topic, arguments, ch)
 	case EventsTopic:
-		return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, subscriptionID, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
+		return NewEventsDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
 	case AccountStatusesTopic:
-		return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, subscriptionID, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
+		return NewAccountStatusesDataProvider(ctx, s.logger, s.stateStreamApi, topic, arguments, ch, s.chain, s.eventFilterConfig, s.heartbeatInterval)
 	case TransactionStatusesTopic:
-		return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch)
+		return NewTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch)
 	case SendAndGetTransactionStatusesTopic:
-		return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, subscriptionID, s.linkGenerator, topic, arguments, ch)
+		return NewSendAndGetTransactionStatusesDataProvider(ctx, s.logger, s.accessApi, s.linkGenerator, topic, arguments, ch)
 	default:
 		return nil, fmt.Errorf("unsupported topic \"%s\"", topic)
 	}
diff --git a/engine/access/rest/websockets/data_providers/factory_test.go b/engine/access/rest/websockets/data_providers/factory_test.go
index a15f58443e1..d3378b527ea 100644
--- a/engine/access/rest/websockets/data_providers/factory_test.go
+++ b/engine/access/rest/websockets/data_providers/factory_test.go
@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"testing"
 
-	"github.com/google/uuid"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/suite"
 
@@ -161,7 +160,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
 			s.T().Parallel()
 			test.setupSubscription()
 
-			provider, err := s.factory.NewDataProvider(s.ctx, uuid.New(), test.topic, test.arguments, s.ch)
+			provider, err := s.factory.NewDataProvider(s.ctx, test.topic, test.arguments, s.ch)
 			s.Require().NotNil(provider, "Expected provider for topic %s", test.topic)
 			s.Require().NoError(err, "Expected no error for topic %s", test.topic)
 			s.Require().Equal(test.topic, provider.Topic())
@@ -184,7 +183,7 @@ func (s *DataProviderFactorySuite) TestUnsupportedTopics() {
 	}
 
 	for _, topic := range unsupportedTopics {
-		provider, err := s.factory.NewDataProvider(s.ctx, uuid.New(), topic, nil, s.ch)
+		provider, err := s.factory.NewDataProvider(s.ctx, topic, nil, s.ch)
 		s.Require().Nil(provider, "Expected no provider for unsupported topic %s", topic)
 		s.Require().Error(err, "Expected error for unsupported topic %s", topic)
 		s.Require().EqualError(err, fmt.Sprintf("unsupported topic \"%s\"", topic))
diff --git a/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go b/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go
index 7c7d4bc58c0..af49cb4e687 100644
--- a/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go
+++ b/engine/access/rest/websockets/data_providers/mock/data_provider_factory.go
@@ -9,8 +9,6 @@ import (
 	mock "github.com/stretchr/testify/mock"
 
 	models "github.com/onflow/flow-go/engine/access/rest/websockets/models"
-
-	uuid "github.com/google/uuid"
 )
 
 // DataProviderFactory is an autogenerated mock type for the DataProviderFactory type
@@ -18,9 +16,9 @@ type DataProviderFactory struct {
 	mock.Mock
 }
 
-// NewDataProvider provides a mock function with given fields: ctx, subscriptionID, topic, args, ch
-func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, subscriptionID uuid.UUID, topic string, args models.Arguments, ch chan<- interface{}) (data_providers.DataProvider, error) {
-	ret := _m.Called(ctx, subscriptionID, topic, args, ch)
+// NewDataProvider provides a mock function with given fields: ctx, topic, args, ch
+func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, topic string, args models.Arguments, ch chan<- interface{}) (data_providers.DataProvider, error) {
+	ret := _m.Called(ctx, topic, args, ch)
 
 	if len(ret) == 0 {
 		panic("no return value specified for NewDataProvider")
@@ -28,19 +26,19 @@ func (_m *DataProviderFactory) NewDataProvider(ctx context.Context, subscription
 
 	var r0 data_providers.DataProvider
 	var r1 error
-	if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) (data_providers.DataProvider, error)); ok {
-		return rf(ctx, subscriptionID, topic, args, ch)
+	if rf, ok := ret.Get(0).(func(context.Context, string, models.Arguments, chan<- interface{}) (data_providers.DataProvider, error)); ok {
+		return rf(ctx, topic, args, ch)
 	}
-	if rf, ok := ret.Get(0).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) data_providers.DataProvider); ok {
-		r0 = rf(ctx, subscriptionID, topic, args, ch)
+	if rf, ok := ret.Get(0).(func(context.Context, string, models.Arguments, chan<- interface{}) data_providers.DataProvider); ok {
+		r0 = rf(ctx, topic, args, ch)
 	} else {
 		if ret.Get(0) != nil {
 			r0 = ret.Get(0).(data_providers.DataProvider)
 		}
 	}
 
-	if rf, ok := ret.Get(1).(func(context.Context, uuid.UUID, string, models.Arguments, chan<- interface{}) error); ok {
-		r1 = rf(ctx, subscriptionID, topic, args, ch)
+	if rf, ok := ret.Get(1).(func(context.Context, string, models.Arguments, chan<- interface{}) error); ok {
+		r1 = rf(ctx, topic, args, ch)
 	} else {
 		r1 = ret.Error(1)
 	}
diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
index e496792c9b2..9bfde0aef3a 100644
--- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -40,7 +39,6 @@ func NewSendAndGetTransactionStatusesDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	api access.API,
-	subscriptionID uuid.UUID,
 	linkGenerator commonmodels.LinkGenerator,
 	topic string,
 	arguments models.Arguments,
@@ -61,7 +59,6 @@ func NewSendAndGetTransactionStatusesDataProvider(
 	subCtx, cancel := context.WithCancel(ctx)
 
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go
index 75932ebde11..7fad80de987 100644
--- a/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/send_and_get_transaction_statuses_provider_test.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"testing"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
@@ -136,7 +135,6 @@ func (s *SendTransactionStatusesProviderSuite) TestSendTransactionStatusesDataPr
 				ctx,
 				s.log,
 				s.api,
-				uuid.New(),
 				s.linkGenerator,
 				topic,
 				test.arguments,
diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
index 87fca705b44..de667664769 100644
--- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
+++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider.go
@@ -4,7 +4,6 @@ import (
 	"context"
 	"fmt"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -43,7 +42,6 @@ func NewTransactionStatusesDataProvider(
 	ctx context.Context,
 	logger zerolog.Logger,
 	api access.API,
-	subscriptionID uuid.UUID,
 	linkGenerator commonmodels.LinkGenerator,
 	topic string,
 	arguments models.Arguments,
@@ -64,7 +62,6 @@ func NewTransactionStatusesDataProvider(
 	subCtx, cancel := context.WithCancel(ctx)
 
 	p.baseDataProvider = newBaseDataProvider(
-		subscriptionID,
 		topic,
 		arguments,
 		cancel,
diff --git a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go
index fa05becd429..72101f769f7 100644
--- a/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go
+++ b/engine/access/rest/websockets/data_providers/transaction_statuses_provider_test.go
@@ -7,7 +7,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/google/uuid"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
@@ -188,7 +187,6 @@ func (s *TransactionStatusesProviderSuite) TestTransactionStatusesDataProvider_I
 				ctx,
 				s.log,
 				s.api,
-				uuid.New(),
 				s.linkGenerator,
 				topic,
 				test.arguments,
@@ -283,7 +281,6 @@ func (s *TransactionStatusesProviderSuite) TestMessageIndexTransactionStatusesPr
 		ctx,
 		s.log,
 		s.api,
-		uuid.New(),
 		s.linkGenerator,
 		topic,
 		arguments,
diff --git a/engine/access/rest/websockets/data_providers/unit_test.go b/engine/access/rest/websockets/data_providers/unit_test.go
index b3ad732caa3..94d9534798f 100644
--- a/engine/access/rest/websockets/data_providers/unit_test.go
+++ b/engine/access/rest/websockets/data_providers/unit_test.go
@@ -6,7 +6,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/google/uuid"
 	"github.com/stretchr/testify/require"
 
 	"github.com/onflow/flow-go/engine/access/rest/websockets/models"
@@ -64,7 +63,7 @@ func testHappyPath(
 			test.setupBackend(sub)
 
 			// Create the data provider instance
-			provider, err := factory.NewDataProvider(ctx, uuid.New(), topic, test.arguments, send)
+			provider, err := factory.NewDataProvider(ctx, topic, test.arguments, send)
 
 			require.NotNil(t, provider)
 			require.NoError(t, err)
@@ -73,9 +72,7 @@ func testHappyPath(
 			defer provider.Close()
 
 			// Run the provider in a separate goroutine
-			done := make(chan struct{})
 			go func() {
-				defer close(done)
 				err = provider.Run()
 				require.NoError(t, err)
 			}()
@@ -86,9 +83,6 @@ func testHappyPath(
 				sendData(dataChan)
 			}()
 
-			// Wait for the provider goroutine to finish
-			unittest.RequireCloseBefore(t, done, time.Second, "provider failed to stop")
-
 			// Collect responses
 			for i, expected := range test.expectedResponses {
 				unittest.RequireReturnsBefore(t, func() {
diff --git a/engine/access/rest/websockets/error_codes.go b/engine/access/rest/websockets/error_codes.go
new file mode 100644
index 00000000000..fd206bed0b3
--- /dev/null
+++ b/engine/access/rest/websockets/error_codes.go
@@ -0,0 +1,10 @@
+package websockets
+
+type Code int
+
+const (
+	InvalidMessage Code = iota
+	InvalidArgument
+	NotFound
+	SubscriptionError
+)
diff --git a/engine/access/rest/websockets/models/account_models.go b/engine/access/rest/websockets/models/account_models.go
index bad5e155721..7ad7243a96c 100644
--- a/engine/access/rest/websockets/models/account_models.go
+++ b/engine/access/rest/websockets/models/account_models.go
@@ -2,7 +2,7 @@ package models
 
 // AccountStatusesResponse is the response message for 'events' topic.
 type AccountStatusesResponse struct {
-	BlockID       string        `json:"block_id"`
+	BlockID       string        `json:"blockID"`
 	Height        string        `json:"height"`
 	AccountEvents AccountEvents `json:"account_events"`
 	MessageIndex  uint64        `json:"message_index"`
diff --git a/engine/access/rest/websockets/models/base_message.go b/engine/access/rest/websockets/models/base_message.go
index dc26a2914b0..cdcd72eb1ed 100644
--- a/engine/access/rest/websockets/models/base_message.go
+++ b/engine/access/rest/websockets/models/base_message.go
@@ -8,19 +8,14 @@ const (
 
 // BaseMessageRequest represents a base structure for incoming messages.
 type BaseMessageRequest struct {
-	// SubscriptionID is UUID generated by either client or server to uniquely identify subscription.
-	// It is empty for 'list_subscription' action
-	SubscriptionID string `json:"subscription_id,omitempty"`
-	Action         string `json:"action"` // Action is an action to perform (e.g. 'subscribe' to some data)
+	Action          string `json:"action"`     // subscribe, unsubscribe or list_subscriptions
+	ClientMessageID string `json:"message_id"` // ClientMessageID is a uuid generated by client to identify request/response uniquely
 }
 
 // BaseMessageResponse represents a base structure for outgoing messages.
 type BaseMessageResponse struct {
-	SubscriptionID string       `json:"subscription_id"` // SubscriptionID might be empty in case of error response
-	Error          ErrorMessage `json:"error,omitempty"` // Error might be empty in case of OK response
-}
-
-type ErrorMessage struct {
-	Code    int    `json:"code"` // Code is an error code that categorizes an error
-	Message string `json:"message"`
+	SubscriptionID  string       `json:"subscription_id"`
+	ClientMessageID string       `json:"message_id,omitempty"` // ClientMessageID may be empty in case we send msg by ourselves (e.g. error occurred)
+	Success         bool         `json:"success"`
+	Error           ErrorMessage `json:"error,omitempty"`
 }
diff --git a/engine/access/rest/websockets/models/error_message.go b/engine/access/rest/websockets/models/error_message.go
new file mode 100644
index 00000000000..d5c0670926f
--- /dev/null
+++ b/engine/access/rest/websockets/models/error_message.go
@@ -0,0 +1,7 @@
+package models
+
+type ErrorMessage struct {
+	Code    int    `json:"code"`
+	Message string `json:"message"`
+	Action  string `json:"action,omitempty"`
+}
diff --git a/engine/access/rest/websockets/models/list_subscriptions.go b/engine/access/rest/websockets/models/list_subscriptions.go
index 185fc65bbe0..4893a34b09d 100644
--- a/engine/access/rest/websockets/models/list_subscriptions.go
+++ b/engine/access/rest/websockets/models/list_subscriptions.go
@@ -8,6 +8,8 @@ type ListSubscriptionsMessageRequest struct {
 // ListSubscriptionsMessageResponse is the structure used to respond to list_subscriptions requests.
 // It contains a list of active subscriptions for the current WebSocket connection.
 type ListSubscriptionsMessageResponse struct {
-	// Subscription list might be empty in case of no active subscriptions
-	Subscriptions []*SubscriptionEntry `json:"subscriptions"`
+	ClientMessageID string               `json:"message_id"`
+	Success         bool                 `json:"success"`
+	Error           ErrorMessage         `json:"error,omitempty"`
+	Subscriptions   []*SubscriptionEntry `json:"subscriptions,omitempty"`
 }
diff --git a/engine/access/rest/websockets/models/subscribe_message.go b/engine/access/rest/websockets/models/subscribe_message.go
index b4bd9e871da..532e4c6a987 100644
--- a/engine/access/rest/websockets/models/subscribe_message.go
+++ b/engine/access/rest/websockets/models/subscribe_message.go
@@ -6,7 +6,7 @@ type Arguments map[string]interface{}
 type SubscribeMessageRequest struct {
 	BaseMessageRequest
 	Topic     string    `json:"topic"`     // Topic to subscribe to
-	Arguments Arguments `json:"arguments"` // Arguments are the arguments for the subscribed topic
+	Arguments Arguments `json:"arguments"` // Additional arguments for subscription
 }
 
 // SubscribeMessageResponse represents the response to a subscription request.
diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go
index 9a60ab1a0d9..e48cdac15bb 100644
--- a/engine/access/rest/websockets/models/subscription_entry.go
+++ b/engine/access/rest/websockets/models/subscription_entry.go
@@ -2,7 +2,7 @@ package models
 
 // SubscriptionEntry represents an active subscription entry.
 type SubscriptionEntry struct {
-	SubscriptionID string    `json:"subscription_id"` // ID is a client generated UUID for subscription
-	Topic          string    `json:"topic"`           // Topic of the subscription
-	Arguments      Arguments `json:"arguments"`
+	Topic     string    `json:"topic,omitempty"` // Topic of the subscription
+	ID        string    `json:"id,omitempty"`    // Unique subscription ID
+	Arguments Arguments `json:"arguments"`       // Arguments of the subscription
 }
diff --git a/engine/access/rest/websockets/models/unsubscribe_message.go b/engine/access/rest/websockets/models/unsubscribe_message.go
index f72e6cb5c7b..1402189a601 100644
--- a/engine/access/rest/websockets/models/unsubscribe_message.go
+++ b/engine/access/rest/websockets/models/unsubscribe_message.go
@@ -2,8 +2,8 @@ package models
 
 // UnsubscribeMessageRequest represents a request to unsubscribe from a topic.
 type UnsubscribeMessageRequest struct {
-	// Note: subscription_id is mandatory for this request
 	BaseMessageRequest
+	SubscriptionID string `json:"id"`
 }
 
 // UnsubscribeMessageResponse represents the response to an unsubscription request.

From bacec0351c291dc68f4554edc58d10db0c34b421 Mon Sep 17 00:00:00 2001
From: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Date: Wed, 15 Jan 2025 14:19:50 +0200
Subject: [PATCH 4/9] Updated godoc

---
 engine/access/rest/websockets/data_providers/data_provider.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/engine/access/rest/websockets/data_providers/data_provider.go b/engine/access/rest/websockets/data_providers/data_provider.go
index cfa29853908..3dad24a3ff1 100644
--- a/engine/access/rest/websockets/data_providers/data_provider.go
+++ b/engine/access/rest/websockets/data_providers/data_provider.go
@@ -7,7 +7,7 @@ import (
 )
 
 // The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector.
-// It provides methods for retrieving the provider's unique ID, topic, and a methods to close and run the provider.
+// It provides methods for retrieving the provider's unique ID, topic, arguments and a methods to close and run the provider.
 type DataProvider interface {
 	// ID returns the unique identifier of the data provider.
 	ID() uuid.UUID

From b0c5ffa4ab7521dfaa2db958882bdff4f6000b59 Mon Sep 17 00:00:00 2001
From: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Date: Thu, 16 Jan 2025 16:25:57 +0200
Subject: [PATCH 5/9] Removed omitempty from ID and Topic for SubscriptionEntry

---
 engine/access/rest/websockets/models/subscription_entry.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go
index e48cdac15bb..6c88d80b59a 100644
--- a/engine/access/rest/websockets/models/subscription_entry.go
+++ b/engine/access/rest/websockets/models/subscription_entry.go
@@ -2,7 +2,7 @@ package models
 
 // SubscriptionEntry represents an active subscription entry.
 type SubscriptionEntry struct {
-	Topic     string    `json:"topic,omitempty"` // Topic of the subscription
-	ID        string    `json:"id,omitempty"`    // Unique subscription ID
-	Arguments Arguments `json:"arguments"`       // Arguments of the subscription
+	Topic     string    `json:"topic"`     // Topic of the subscription
+	ID        string    `json:"id"`        // Unique subscription ID
+	Arguments Arguments `json:"arguments"` // Arguments of the subscription
 }

From d2e11f903cf18c0c71cb76dd44b227efea1f1aab Mon Sep 17 00:00:00 2001
From: Andrii Slisarchuk <andriyslisarchuk@gmail.com>
Date: Thu, 23 Jan 2025 12:27:35 +0200
Subject: [PATCH 6/9] fixed remarks and tests

---
 engine/access/rest/websockets/controller_test.go          | 8 ++++++--
 .../access/rest/websockets/models/unsubscribe_message.go  | 2 +-
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git a/engine/access/rest/websockets/controller_test.go b/engine/access/rest/websockets/controller_test.go
index 60a01632d90..0f8b05ccd1f 100644
--- a/engine/access/rest/websockets/controller_test.go
+++ b/engine/access/rest/websockets/controller_test.go
@@ -8,6 +8,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/onflow/flow-go/engine/access/rest/common/parser"
+
 	"github.com/google/uuid"
 	"github.com/gorilla/websocket"
 	"github.com/rs/zerolog"
@@ -298,7 +300,6 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
 				require.True(t, ok)
 				require.Empty(t, response.Error)
 				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
-				require.Equal(t, request.SubscriptionID, response.SubscriptionID)
 
 				return websocket.ErrCloseSent
 			}).
@@ -470,7 +471,10 @@ func (s *WsControllerSuite) TestListSubscriptions() {
 		done := make(chan struct{})
 
 		topic := dp.BlocksTopic
-		arguments := models.Arguments{}
+		arguments := models.Arguments{
+			"start_block_id": unittest.IdentifierFixture().String(),
+			"block_status":   parser.Finalized,
+		}
 		dataProvider.On("Topic").Return(topic)
 		dataProvider.On("Arguments").Return(arguments)
 		// data provider might finish on its own or controller will close it via Close()
diff --git a/engine/access/rest/websockets/models/unsubscribe_message.go b/engine/access/rest/websockets/models/unsubscribe_message.go
index 1402189a601..f72e6cb5c7b 100644
--- a/engine/access/rest/websockets/models/unsubscribe_message.go
+++ b/engine/access/rest/websockets/models/unsubscribe_message.go
@@ -2,8 +2,8 @@ package models
 
 // UnsubscribeMessageRequest represents a request to unsubscribe from a topic.
 type UnsubscribeMessageRequest struct {
+	// Note: subscription_id is mandatory for this request
 	BaseMessageRequest
-	SubscriptionID string `json:"id"`
 }
 
 // UnsubscribeMessageResponse represents the response to an unsubscription request.

From 899b691b852ead3155dbd9a85d0ef15cd7238e70 Mon Sep 17 00:00:00 2001
From: Andrii Slisarchuk <andriyslisarchuk@gmail.com>
Date: Thu, 23 Jan 2025 12:29:43 +0200
Subject: [PATCH 7/9] remove error_codes.go

---
 engine/access/rest/websockets/error_codes.go | 10 ----------
 1 file changed, 10 deletions(-)
 delete mode 100644 engine/access/rest/websockets/error_codes.go

diff --git a/engine/access/rest/websockets/error_codes.go b/engine/access/rest/websockets/error_codes.go
deleted file mode 100644
index fd206bed0b3..00000000000
--- a/engine/access/rest/websockets/error_codes.go
+++ /dev/null
@@ -1,10 +0,0 @@
-package websockets
-
-type Code int
-
-const (
-	InvalidMessage Code = iota
-	InvalidArgument
-	NotFound
-	SubscriptionError
-)

From 93bb91542ea8c50e6ecb888bff4c18d63b8a1887 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Thu, 23 Jan 2025 09:45:53 -0800
Subject: [PATCH 8/9] Apply suggestions from code review

---
 engine/access/rest/websockets/models/account_models.go     | 2 +-
 engine/access/rest/websockets/models/subscription_entry.go | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/engine/access/rest/websockets/models/account_models.go b/engine/access/rest/websockets/models/account_models.go
index 7ad7243a96c..bad5e155721 100644
--- a/engine/access/rest/websockets/models/account_models.go
+++ b/engine/access/rest/websockets/models/account_models.go
@@ -2,7 +2,7 @@ package models
 
 // AccountStatusesResponse is the response message for 'events' topic.
 type AccountStatusesResponse struct {
-	BlockID       string        `json:"blockID"`
+	BlockID       string        `json:"block_id"`
 	Height        string        `json:"height"`
 	AccountEvents AccountEvents `json:"account_events"`
 	MessageIndex  uint64        `json:"message_index"`
diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go
index 48566bb090e..4656d9a5544 100644
--- a/engine/access/rest/websockets/models/subscription_entry.go
+++ b/engine/access/rest/websockets/models/subscription_entry.go
@@ -2,7 +2,7 @@ package models
 
 // SubscriptionEntry represents an active subscription entry.
 type SubscriptionEntry struct {
+	SubscriptionID string    `json:"subscription_id"` // ID is a client generated UUID for subscription
 	Topic          string    `json:"topic"`     // Topic of the subscription
-	SubscriptionID string    `json:"id"`        // Unique subscription ID
-	Arguments      Arguments `json:"arguments"` // Arguments of the subscription
+	Arguments      Arguments `json:"arguments"`
 }

From 7413e0281bdc30b827ed927256d028c623ec1436 Mon Sep 17 00:00:00 2001
From: Peter Argue <89119817+peterargue@users.noreply.github.com>
Date: Thu, 23 Jan 2025 10:46:08 -0800
Subject: [PATCH 9/9] Update
 engine/access/rest/websockets/models/subscription_entry.go

---
 engine/access/rest/websockets/models/subscription_entry.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/engine/access/rest/websockets/models/subscription_entry.go b/engine/access/rest/websockets/models/subscription_entry.go
index 4656d9a5544..9a60ab1a0d9 100644
--- a/engine/access/rest/websockets/models/subscription_entry.go
+++ b/engine/access/rest/websockets/models/subscription_entry.go
@@ -3,6 +3,6 @@ package models
 // SubscriptionEntry represents an active subscription entry.
 type SubscriptionEntry struct {
 	SubscriptionID string    `json:"subscription_id"` // ID is a client generated UUID for subscription
-	Topic          string    `json:"topic"`     // Topic of the subscription
+	Topic          string    `json:"topic"`           // Topic of the subscription
 	Arguments      Arguments `json:"arguments"`
 }