Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add arguments getter for data providers #6873

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
cb2fa7d
Added arguments getter for data providers, filled ListSubscriptionsMe…
UlyanaAndrukhiv Jan 13, 2025
0cd1186
Merge branch 'master' into UlianaAndrukhiv/6865-data-provider-arguments
UlyanaAndrukhiv Jan 13, 2025
acb6b02
Updatated TestListSubscriptions unit test
UlyanaAndrukhiv Jan 13, 2025
33e3913
Merge branch 'UlianaAndrukhiv/6865-data-provider-arguments' of github…
UlyanaAndrukhiv Jan 13, 2025
24fb036
Merge branch 'illia-malachyn/6845-unify-subscription-and-message-id' …
UlyanaAndrukhiv Jan 15, 2025
0e3d1cf
Reverted changes from illia-malachyn/6845-unify-subscription-and-mess…
UlyanaAndrukhiv Jan 15, 2025
bacec03
Updated godoc
UlyanaAndrukhiv Jan 15, 2025
4f23a22
Merge branch 'master' into UlianaAndrukhiv/6865-data-provider-arguments
UlyanaAndrukhiv Jan 16, 2025
b0c5ffa
Removed omitempty from ID and Topic for SubscriptionEntry
UlyanaAndrukhiv Jan 16, 2025
fe9f40b
Merge branch 'UlianaAndrukhiv/6865-data-provider-arguments' of github…
UlyanaAndrukhiv Jan 16, 2025
ca06def
Merge branch 'master' into UlianaAndrukhiv/6865-data-provider-arguments
UlyanaAndrukhiv Jan 20, 2025
59ce14a
Merge branch 'master' of github.com:The-K-R-O-K/flow-go into UlianaAn…
Guitarheroua Jan 23, 2025
d89f7da
Merge branch 'master' into UlianaAndrukhiv/6865-data-provider-arguments
Guitarheroua Jan 23, 2025
d2e11f9
fixed remarks and tests
Guitarheroua Jan 23, 2025
899b691
remove error_codes.go
Guitarheroua Jan 23, 2025
93bb915
Apply suggestions from code review
peterargue Jan 23, 2025
a7f3163
Merge branch 'master' into UlianaAndrukhiv/6865-data-provider-arguments
peterargue Jan 23, 2025
7413e02
Update engine/access/rest/websockets/models/subscription_entry.go
peterargue Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions engine/access/rest/websockets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, _ models.ListS
subs = append(subs, &models.SubscriptionEntry{
SubscriptionID: id.String(),
Topic: provider.Topic(),
Arguments: provider.Arguments(),
})
return nil
})
Expand Down
9 changes: 8 additions & 1 deletion engine/access/rest/websockets/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/onflow/flow-go/engine/access/rest/common/parser"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -298,7 +300,6 @@ func (s *WsControllerSuite) TestUnsubscribeRequest() {
require.True(t, ok)
require.Empty(t, response.Error)
require.Equal(t, request.SubscriptionID, response.SubscriptionID)
require.Equal(t, request.SubscriptionID, response.SubscriptionID)

return websocket.ErrCloseSent
}).
Expand Down Expand Up @@ -470,7 +471,12 @@ func (s *WsControllerSuite) TestListSubscriptions() {
done := make(chan struct{})

topic := dp.BlocksTopic
arguments := models.Arguments{
"start_block_id": unittest.IdentifierFixture().String(),
"block_status": parser.Finalized,
}
dataProvider.On("Topic").Return(topic)
dataProvider.On("Arguments").Return(arguments)
// data provider might finish on its own or controller will close it via Close()
dataProvider.On("Close").Return(nil).Maybe()
dataProvider.
Expand Down Expand Up @@ -514,6 +520,7 @@ func (s *WsControllerSuite) TestListSubscriptions() {
require.Equal(t, 1, len(response.Subscriptions))
require.Equal(t, subscriptionID, response.Subscriptions[0].SubscriptionID)
require.Equal(t, topic, response.Subscriptions[0].Topic)
require.Equal(t, arguments, response.Subscriptions[0].Arguments)
require.Equal(t, models.ListSubscriptionsAction, response.Action)

return websocket.ErrCloseSent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func NewAccountStatusesDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, accountStatusesArgs), // Set up a subscription to account statuses based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package data_providers
import (
"context"

"github.com/onflow/flow-go/engine/access/rest/websockets/models"
"github.com/onflow/flow-go/engine/access/subscription"
)

// baseDataProvider holds common objects for the provider
type baseDataProvider struct {
subscriptionID string
topic string
arguments models.Arguments
cancel context.CancelFunc
send chan<- interface{}
subscription subscription.Subscription
Expand All @@ -19,13 +21,15 @@ type baseDataProvider struct {
func newBaseDataProvider(
subscriptionID string,
topic string,
arguments models.Arguments,
cancel context.CancelFunc,
send chan<- interface{},
subscription subscription.Subscription,
) *baseDataProvider {
return &baseDataProvider{
subscriptionID: subscriptionID,
topic: topic,
arguments: arguments,
cancel: cancel,
send: send,
subscription: subscription,
Expand All @@ -42,6 +46,11 @@ func (b *baseDataProvider) Topic() string {
return b.topic
}

// Arguments returns the arguments associated with the data provider.
func (b *baseDataProvider) Arguments() models.Arguments {
return b.arguments
}

// Close terminates the data provider.
//
// No errors are expected during normal operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func NewBlockDigestsDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, blockArgs), // Set up a subscription to block digests based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewBlockHeadersDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, blockArgs), // Set up a subscription to block headers based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewBlocksDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, p.arguments), // Set up a subscription to blocks based on arguments.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package data_providers

import "github.com/onflow/flow-go/engine/access/rest/websockets/models"

// The DataProvider is the interface abstracts of the actual data provider used by the WebSocketCollector.
// It provides methods for retrieving the provider's unique SubscriptionID, topic, and a methods to close and run the provider.
type DataProvider interface {
// ID returns the unique identifier of the data provider.
ID() string
// Topic returns the topic associated with the data provider.
Topic() string
// Arguments returns the arguments associated with the data provider.
Arguments() models.Arguments
// Close terminates the data provider.
//
// No errors are expected during normal operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewEventsDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, eventArgs), // Set up a subscription to events based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (s *DataProviderFactorySuite) TestSupportedTopics() {
s.Require().NotNil(provider, "Expected provider for topic %s", test.topic)
s.Require().NoError(err, "Expected no error for topic %s", test.topic)
s.Require().Equal(test.topic, provider.Topic())
s.Require().Equal(test.arguments, provider.Arguments())

test.assertExpectations()
})
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func NewSendAndGetTransactionStatusesDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, sendTxStatusesArgs), // Set up a subscription to tx statuses based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewTransactionStatusesDataProvider(
p.baseDataProvider = newBaseDataProvider(
subscriptionID,
topic,
arguments,
cancel,
send,
p.createSubscription(subCtx, txStatusesArgs), // Set up a subscription to tx statuses based on arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type Arguments map[string]interface{}
type SubscribeMessageRequest struct {
BaseMessageRequest
Topic string `json:"topic"` // Topic to subscribe to
Arguments Arguments `json:"arguments"` // Arguments are the arguments for the subscribed topic
Arguments Arguments `json:"arguments"` // Additional arguments for subscription
}

// SubscribeMessageResponse represents the response to a subscription request.
Expand Down
Loading