From 6b6ef84ebcb5d9dabd8841a46510da88ff7dc4aa Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Thu, 17 Oct 2024 13:35:57 +0300 Subject: [PATCH 1/3] refactor subscribe func and stream mock object --- access/grpc/convert/convert.go | 8 +- access/grpc/grpc.go | 339 +++++++--------------------- access/grpc/grpc_test.go | 388 +++++++++------------------------ 3 files changed, 199 insertions(+), 536 deletions(-) diff --git a/access/grpc/convert/convert.go b/access/grpc/convert/convert.go index 7b902a184..0fb768ec3 100644 --- a/access/grpc/convert/convert.go +++ b/access/grpc/convert/convert.go @@ -245,12 +245,16 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) { }, nil } -func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest { +func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + if m == nil { + return flow.BlockDigest{}, ErrEmptyMessage + } + return flow.BlockDigest{ BlockID: flow.BytesToID(m.GetBlockId()), Height: m.GetBlockHeight(), Timestamp: m.GetBlockTimestamp().AsTime(), - } + }, nil } func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse { diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index c2f668908..ef01017c7 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1188,14 +1188,10 @@ func (c *BaseClient) SubscribeBlocksFromStartBlockID( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.Block) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + return convert.MessageToBlock(response.GetBlock()) + } + blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") return blocksChan, errChan, nil } @@ -1221,14 +1217,10 @@ func (c *BaseClient) SubscribeBlocksFromStartHeight( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.Block) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + return convert.MessageToBlock(response.GetBlock()) + } + blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") return blocksChan, errChan, nil } @@ -1252,14 +1244,10 @@ func (c *BaseClient) SubscribeBlocksFromLatest( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.Block) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlocksFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { + return convert.MessageToBlock(response.GetBlock()) + } + blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") return blocksChan, errChan, nil } @@ -1301,7 +1289,6 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( messageIndex := uint64(0) for { - // Receive the next txResult response txResultsResponse, err := subscribeClient.Recv() if err != nil { if err == io.EOF { @@ -1336,48 +1323,6 @@ func (c *BaseClient) SendAndSubscribeTransactionStatuses( return txStatusChan, errChan, nil } -func receiveBlocksFromClient[Client interface { - Recv() (*access.SubscribeBlocksResponse, error) -}]( - ctx context.Context, - client Client, - blocksChan chan<- flow.Block, - errChan chan<- error, -) { - sendErr := func(err error) { - select { - case <-ctx.Done(): - case errChan <- err: - } - } - - for { - // Receive the next block response - blockResponse, err := client.Recv() - if err != nil { - if err == io.EOF { - // End of stream, return gracefully - return - } - - sendErr(fmt.Errorf("error receiving block: %w", err)) - return - } - - block, err := convert.MessageToBlock(blockResponse.GetBlock()) - if err != nil { - sendErr(fmt.Errorf("error converting message to block: %w", err)) - return - } - - select { - case <-ctx.Done(): - return - case blocksChan <- block: - } - } -} - func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( ctx context.Context, startBlockID flow.Identifier, @@ -1399,14 +1344,10 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( return nil, nil, newRPCError(err) } - blockHeaderChan := make(chan flow.BlockHeader) - errChan := make(chan error) - - go func() { - defer close(blockHeaderChan) - defer close(errChan) - receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) - }() + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + return convert.MessageToBlockHeader(response.GetHeader()) + } + blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") return blockHeaderChan, errChan, nil } @@ -1432,14 +1373,10 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( return nil, nil, newRPCError(err) } - blockHeaderChan := make(chan flow.BlockHeader) - errChan := make(chan error) - - go func() { - defer close(blockHeaderChan) - defer close(errChan) - receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) - }() + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + return convert.MessageToBlockHeader(response.GetHeader()) + } + blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") return blockHeaderChan, errChan, nil } @@ -1463,58 +1400,12 @@ func (c *BaseClient) SubscribeBlockHeadersFromLatest( return nil, nil, newRPCError(err) } - blockHeaderChan := make(chan flow.BlockHeader) - errChan := make(chan error) - - go func() { - defer close(blockHeaderChan) - defer close(errChan) - receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan) - }() - - return blockHeaderChan, errChan, nil -} - -func receiveBlockHeadersFromClient[Client interface { - Recv() (*access.SubscribeBlockHeadersResponse, error) -}]( - ctx context.Context, - client Client, - blockHeadersChan chan<- flow.BlockHeader, - errChan chan<- error, -) { - sendErr := func(err error) { - select { - case <-ctx.Done(): - case errChan <- err: - } + convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { + return convert.MessageToBlockHeader(response.GetHeader()) } + blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") - for { - // Receive the next blockHeader response - blockHeaderResponse, err := client.Recv() - if err != nil { - if err == io.EOF { - // End of stream, return gracefully - return - } - - sendErr(fmt.Errorf("error receiving blockHeader: %w", err)) - return - } - - blockHeader, err := convert.MessageToBlockHeader(blockHeaderResponse.GetHeader()) - if err != nil { - sendErr(fmt.Errorf("error converting message to block header: %w", err)) - return - } - - select { - case <-ctx.Done(): - return - case blockHeadersChan <- blockHeader: - } - } + return blockHeaderChan, errChan, nil } func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( @@ -1537,14 +1428,10 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( return nil, nil, newRPCError(err) } - accountStatutesChan := make(chan flow.AccountStatus) - errChan := make(chan error) - - go func() { - defer close(accountStatutesChan) - defer close(errChan) - receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) - }() + convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { + return convert.MessageToAccountStatus(response) + } + accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") return accountStatutesChan, errChan, nil } @@ -1569,14 +1456,10 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( return nil, nil, newRPCError(err) } - accountStatutesChan := make(chan flow.AccountStatus) - errChan := make(chan error) - - go func() { - defer close(accountStatutesChan) - defer close(errChan) - receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) - }() + convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { + return convert.MessageToAccountStatus(response) + } + accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") return accountStatutesChan, errChan, nil } @@ -1599,64 +1482,12 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( return nil, nil, newRPCError(err) } - accountStatutesChan := make(chan flow.AccountStatus) - errChan := make(chan error) - - go func() { - defer close(accountStatutesChan) - defer close(errChan) - receiveAccountStatusesFromStream(ctx, subscribeClient, accountStatutesChan, errChan) - }() - - return accountStatutesChan, errChan, nil -} - -func receiveAccountStatusesFromStream[Stream interface { - Recv() (*executiondata.SubscribeAccountStatusesResponse, error) -}]( - ctx context.Context, - stream Stream, - accountStatutesChan chan<- flow.AccountStatus, - errChan chan<- error, -) { - sendErr := func(err error) { - select { - case <-ctx.Done(): - case errChan <- err: - } + convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { + return convert.MessageToAccountStatus(response) } + accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") - var nextExpectedMsgIndex uint64 - for { - accountStatusResponse, err := stream.Recv() - if err != nil { - if err == io.EOF { - // End of stream, return gracefully - return - } - - sendErr(fmt.Errorf("error receiving account status: %w", err)) - return - } - - accountStatus, err := convert.MessageToAccountStatus(accountStatusResponse) - if err != nil { - sendErr(fmt.Errorf("error converting message to account status: %w", err)) - return - } - - if accountStatus.MessageIndex != nextExpectedMsgIndex { - sendErr(fmt.Errorf("message received out of order")) - return - } - nextExpectedMsgIndex = accountStatus.MessageIndex + 1 - - select { - case <-ctx.Done(): - return - case accountStatutesChan <- accountStatus: - } - } + return accountStatutesChan, errChan, nil } func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( @@ -1680,16 +1511,12 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.BlockDigest) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + return convert.MessageToBlockDigest(response) + } + blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blocksChan, errChan, nil + return blockDigestChan, errChan, nil } func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( @@ -1713,16 +1540,12 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.BlockDigest) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + return convert.MessageToBlockDigest(response) + } + blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blocksChan, errChan, nil + return blockDigestChan, errChan, nil } func (c *BaseClient) SubscribeBlockDigestsFromLatest( @@ -1744,26 +1567,23 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( return nil, nil, newRPCError(err) } - blocksChan := make(chan flow.BlockDigest) - errChan := make(chan error) - - go func() { - defer close(blocksChan) - defer close(errChan) - receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan) - }() + convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { + return convert.MessageToBlockDigest(response) + } + blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blocksChan, errChan, nil + return blockDigestChan, errChan, nil } -func receiveBlockDigestFromClient[Client interface { - Recv() (*access.SubscribeBlockDigestsResponse, error) -}]( +func subscribe[Response any, ClientResponse any]( ctx context.Context, - client Client, - blockDigestsChan chan<- flow.BlockDigest, - errChan chan<- error, -) { + receive func() (*ClientResponse, error), + convertResponse func(*ClientResponse) (Response, error), + topicNameForErrors string, +) (chan Response, chan error) { + subChan := make(chan Response) + errChan := make(chan error) + sendErr := func(err error) { select { case <-ctx.Done(): @@ -1771,25 +1591,34 @@ func receiveBlockDigestFromClient[Client interface { } } - for { - // Receive the next blockDigest response - blockDigestResponse, err := client.Recv() - if err != nil { - if err == io.EOF { - // End of stream, return gracefully + go func() { + defer close(subChan) + defer close(errChan) + + for { + resp, err := receive() + if err != nil { + if err == io.EOF { + return + } + + sendErr(fmt.Errorf("error receiving %s: %w", topicNameForErrors, err)) return } - sendErr(fmt.Errorf("error receiving blockDigest: %w", err)) - return - } - - blockDigest := convert.MessageToBlockDigest(blockDigestResponse) + response, err := convertResponse(resp) + if err != nil { + sendErr(fmt.Errorf("error converting %s: %w", topicNameForErrors, err)) + return + } - select { - case <-ctx.Done(): - return - case blockDigestsChan <- blockDigest: + select { + case <-ctx.Done(): + return + case subChan <- response: + } } - } + }() + + return subChan, errChan } diff --git a/access/grpc/grpc_test.go b/access/grpc/grpc_test.go index 30844d9a7..d92a8a039 100644 --- a/access/grpc/grpc_test.go +++ b/access/grpc/grpc_test.go @@ -1734,6 +1734,28 @@ func TestClient_GetExecutionResultByID(t *testing.T) { } func TestClient_SubscribeExecutionData(t *testing.T) { + generateExecutionDataResponse := func(t *testing.T, blockID flow.Identifier, height uint64) *executiondata.SubscribeExecutionDataResponse { + return &executiondata.SubscribeExecutionDataResponse{ + BlockHeight: height, + BlockExecutionData: &entities.BlockExecutionData{ + BlockId: blockID[:], + ChunkExecutionData: []*entities.ChunkExecutionData{}, + }, + BlockTimestamp: timestamppb.Now(), + } + } + + assertSubscribeExecutionDataArgs := func(t *testing.T, expected *executiondata.SubscribeExecutionDataRequest) func(args mock.Arguments) { + return func(args mock.Arguments) { + actual, ok := args.Get(1).(*executiondata.SubscribeExecutionDataRequest) + require.True(t, ok) + + assert.Equal(t, expected.EventEncodingVersion, actual.EventEncodingVersion) + assert.Equal(t, expected.StartBlockHeight, actual.StartBlockHeight) + assert.Equal(t, expected.StartBlockId, actual.StartBlockId) + } + } + ids := test.IdentifierGenerator() t.Run("Happy Path - by height", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { @@ -1746,7 +1768,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - stream := &mockExecutionDataStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeExecutionDataResponse]{ctx: ctx} for i := startHeight; i < startHeight+responseCount; i++ { stream.responses = append(stream.responses, generateExecutionDataResponse(t, ids.New(), i)) } @@ -1788,7 +1810,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - stream := &mockExecutionDataStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeExecutionDataResponse]{ctx: ctx} for i := startHeight; i < startHeight+responseCount; i++ { stream.responses = append(stream.responses, generateExecutionDataResponse(t, ids.New(), i)) } @@ -1828,7 +1850,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, } - stream := &mockExecutionDataStream{ + stream := &mockClientStream[executiondata.SubscribeExecutionDataResponse]{ err: status.Error(codes.Internal, "internal error"), } @@ -1841,7 +1863,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoEvents(t, eventCh, wg.Done) + go assertNoData(t, eventCh, wg.Done, "events") i := 0 for err := range errCh { @@ -1864,7 +1886,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { EventEncodingVersion: entities.EventEncodingVersion_CCF_V0, } - stream := &mockExecutionDataStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeExecutionDataResponse]{ctx: ctx} stream.responses = append(stream.responses, &executiondata.SubscribeExecutionDataResponse{ BlockHeight: startHeight, BlockExecutionData: nil, // nil BlockExecutionData should cause an error @@ -1879,7 +1901,7 @@ func TestClient_SubscribeExecutionData(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoEvents(t, eventCh, wg.Done) + go assertNoData(t, eventCh, wg.Done, "events") i := 0 for err := range errCh { @@ -1907,6 +1929,34 @@ func TestClient_SubscribeEvents(t *testing.T) { return res } + generateEventResponse := func(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { + responseEvents := make([]*entities.Event, 0, len(events)) + for _, e := range events { + eventMsg, err := convert.EventToMessage(e, encoding) + require.NoError(t, err) + responseEvents = append(responseEvents, eventMsg) + } + + return &executiondata.SubscribeEventsResponse{ + BlockHeight: height, + BlockId: blockID[:], + Events: responseEvents, + } + } + + assertSubscribeEventsArgs := func(t *testing.T, expected *executiondata.SubscribeEventsRequest) func(args mock.Arguments) { + return func(args mock.Arguments) { + actual, ok := args.Get(1).(*executiondata.SubscribeEventsRequest) + require.True(t, ok) + + assert.Equal(t, expected.Filter, actual.Filter) + assert.Equal(t, expected.EventEncodingVersion, actual.EventEncodingVersion) + assert.Equal(t, expected.HeartbeatInterval, actual.HeartbeatInterval) + assert.Equal(t, expected.StartBlockHeight, actual.StartBlockHeight) + assert.Equal(t, expected.StartBlockId, actual.StartBlockId) + } + } + t.Run("Happy Path - by height", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { responseCount := uint64(1000) startHeight := uint64(10) @@ -1928,7 +1978,7 @@ func TestClient_SubscribeEvents(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - stream := &mockEventStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeEventsResponse]{ctx: ctx} for i := startHeight; i < startHeight+responseCount; i++ { stream.responses = append(stream.responses, generateEventResponse(t, ids.New(), i, getEvents(2), flow.EventEncodingVersionCCF)) } @@ -1981,7 +2031,7 @@ func TestClient_SubscribeEvents(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - stream := &mockEventStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeEventsResponse]{ctx: ctx} for i := startHeight; i < startHeight+responseCount; i++ { stream.responses = append(stream.responses, generateEventResponse(t, ids.New(), i, getEvents(2), flow.EventEncodingVersionCCF)) } @@ -2028,7 +2078,7 @@ func TestClient_SubscribeEvents(t *testing.T) { StartBlockHeight: startHeight, } - stream := &mockEventStream{ + stream := &mockClientStream[executiondata.SubscribeEventsResponse]{ err: status.Error(codes.Internal, "internal error"), } @@ -2041,7 +2091,7 @@ func TestClient_SubscribeEvents(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoEvents(t, eventCh, wg.Done) + go assertNoData(t, eventCh, wg.Done, "events") i := 0 for err := range errCh { @@ -2071,7 +2121,7 @@ func TestClient_SubscribeEvents(t *testing.T) { StartBlockHeight: startHeight, } - stream := &mockEventStream{ctx: ctx} + stream := &mockClientStream[executiondata.SubscribeEventsResponse]{ctx: ctx} stream.responses = append(stream.responses, generateEventResponse(t, ids.New(), startHeight, getEvents(2), flow.EventEncodingVersionCCF)) // corrupt the event payload @@ -2086,7 +2136,7 @@ func TestClient_SubscribeEvents(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoEvents(t, eventCh, wg.Done) + go assertNoData(t, eventCh, wg.Done, "events") i := 0 for err := range errCh { @@ -2134,7 +2184,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockAccountStatutesClientStream{ + stream := &mockClientStream[executiondata.SubscribeAccountStatusesResponse]{ ctx: ctx, responses: generateAccountStatusesResponses(responseCount), } @@ -2175,7 +2225,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockAccountStatutesClientStream{ + stream := &mockClientStream[executiondata.SubscribeAccountStatusesResponse]{ ctx: ctx, responses: generateAccountStatusesResponses(responseCount), } @@ -2217,7 +2267,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockAccountStatutesClientStream{ + stream := &mockClientStream[executiondata.SubscribeAccountStatusesResponse]{ ctx: ctx, responses: generateAccountStatusesResponses(responseCount), } @@ -2256,7 +2306,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { t.Run("Stream returns error", executionDataClientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockExecutionDataRPCClient, c *BaseClient) { ctx, cancel := context.WithCancel(ctx) - stream := &mockAccountStatutesClientStream{ + stream := &mockClientStream[executiondata.SubscribeAccountStatusesResponse]{ ctx: ctx, err: status.Error(codes.Internal, "internal error"), } @@ -2270,7 +2320,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoAccountStatuses(t, accountStatuses, wg.Done) + go assertNoData(t, accountStatuses, wg.Done, "account statuses") errorCount := 0 for e := range errCh { @@ -2295,7 +2345,7 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream := &mockAccountStatutesClientStream{ + stream := &mockClientStream[executiondata.SubscribeAccountStatusesResponse]{ ctx: ctx, err: status.Error(codes.Internal, "message received out of order"), responses: generateUnorderedAccountStatusesResponses(2), @@ -2338,123 +2388,6 @@ func TestClient_SubscribeAccountStatuses(t *testing.T) { })) } -func generateEventResponse(t *testing.T, blockID flow.Identifier, height uint64, events []flow.Event, encoding flow.EventEncodingVersion) *executiondata.SubscribeEventsResponse { - responseEvents := make([]*entities.Event, 0, len(events)) - for _, e := range events { - eventMsg, err := convert.EventToMessage(e, encoding) - require.NoError(t, err) - responseEvents = append(responseEvents, eventMsg) - } - - return &executiondata.SubscribeEventsResponse{ - BlockHeight: height, - BlockId: blockID[:], - Events: responseEvents, - } -} - -func generateExecutionDataResponse(t *testing.T, blockID flow.Identifier, height uint64) *executiondata.SubscribeExecutionDataResponse { - return &executiondata.SubscribeExecutionDataResponse{ - BlockHeight: height, - BlockExecutionData: &entities.BlockExecutionData{ - BlockId: blockID[:], - ChunkExecutionData: []*entities.ChunkExecutionData{}, - }, - BlockTimestamp: timestamppb.Now(), - } -} - -func assertSubscribeEventsArgs(t *testing.T, expected *executiondata.SubscribeEventsRequest) func(args mock.Arguments) { - return func(args mock.Arguments) { - actual, ok := args.Get(1).(*executiondata.SubscribeEventsRequest) - require.True(t, ok) - - assert.Equal(t, expected.Filter, actual.Filter) - assert.Equal(t, expected.EventEncodingVersion, actual.EventEncodingVersion) - assert.Equal(t, expected.HeartbeatInterval, actual.HeartbeatInterval) - assert.Equal(t, expected.StartBlockHeight, actual.StartBlockHeight) - assert.Equal(t, expected.StartBlockId, actual.StartBlockId) - } -} - -func assertSubscribeExecutionDataArgs(t *testing.T, expected *executiondata.SubscribeExecutionDataRequest) func(args mock.Arguments) { - return func(args mock.Arguments) { - actual, ok := args.Get(1).(*executiondata.SubscribeExecutionDataRequest) - require.True(t, ok) - - assert.Equal(t, expected.EventEncodingVersion, actual.EventEncodingVersion) - assert.Equal(t, expected.StartBlockHeight, actual.StartBlockHeight) - assert.Equal(t, expected.StartBlockId, actual.StartBlockId) - } -} - -func assertNoErrors(t *testing.T, errCh <-chan error, done func()) { - defer done() - for err := range errCh { - require.NoError(t, err) - } -} - -func assertNoEvents[T any](t *testing.T, eventCh <-chan T, done func()) { - defer done() - for range eventCh { - t.Fatal("should not receive events") - } -} - -func assertNoAccountStatuses(t *testing.T, accountStatusesChan <-chan flow.AccountStatus, done func()) { - defer done() - for range accountStatusesChan { - require.FailNow(t, "should not receive account statuses") - } -} - -type mockEventStream struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*executiondata.SubscribeEventsResponse -} - -func (m *mockEventStream) Recv() (*executiondata.SubscribeEventsResponse, error) { - if m.err != nil { - return nil, m.err - } - - if m.offset >= len(m.responses) { - <-m.ctx.Done() - return nil, io.EOF - } - defer func() { m.offset++ }() - - return m.responses[m.offset], nil -} - -type mockExecutionDataStream struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*executiondata.SubscribeExecutionDataResponse -} - -func (m *mockExecutionDataStream) Recv() (*executiondata.SubscribeExecutionDataResponse, error) { - if m.err != nil { - return nil, m.err - } - - if m.offset >= len(m.responses) { - <-m.ctx.Done() - return nil, io.EOF - } - defer func() { m.offset++ }() - - return m.responses[m.offset], nil -} - func TestClient_SubscribeBlocks(t *testing.T) { blocks := test.BlockGenerator() @@ -2478,7 +2411,7 @@ func TestClient_SubscribeBlocks(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{ + stream := &mockClientStream[access.SubscribeBlocksResponse]{ ctx: ctx, responses: generateBlockResponses(responseCount), } @@ -2509,7 +2442,7 @@ func TestClient_SubscribeBlocks(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{ + stream := &mockClientStream[access.SubscribeBlocksResponse]{ ctx: ctx, responses: generateBlockResponses(responseCount), } @@ -2541,7 +2474,7 @@ func TestClient_SubscribeBlocks(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{ + stream := &mockClientStream[access.SubscribeBlocksResponse]{ ctx: ctx, responses: generateBlockResponses(responseCount), } @@ -2571,7 +2504,7 @@ func TestClient_SubscribeBlocks(t *testing.T) { t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { ctx, cancel := context.WithCancel(ctx) defer cancel() - stream := &mockBlockClientStream[access.SubscribeBlocksResponse]{ + stream := &mockClientStream[access.SubscribeBlocksResponse]{ ctx: ctx, err: status.Error(codes.Internal, "internal error"), } @@ -2585,7 +2518,7 @@ func TestClient_SubscribeBlocks(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoBlocks(t, blockCh, wg.Done) + go assertNoData(t, blockCh, wg.Done, "blocks") errorCount := 0 for e := range errCh { @@ -2599,36 +2532,6 @@ func TestClient_SubscribeBlocks(t *testing.T) { })) } -type mockBlockClientStream[SubscribeBlocksResponse any] struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*SubscribeBlocksResponse -} - -func (s *mockBlockClientStream[SubscribeBlocksResponse]) Recv() (*SubscribeBlocksResponse, error) { - if s.err != nil { - return nil, s.err - } - - if s.offset >= len(s.responses) { - <-s.ctx.Done() - return nil, io.EOF - } - defer func() { s.offset++ }() - - return s.responses[s.offset], nil -} - -func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) { - defer done() - for range blocksCh { - require.FailNow(t, "should not receive blocks") - } -} - func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { transactions := test.TransactionGenerator() @@ -2656,7 +2559,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { tx := transactions.New() ctx, cancel := context.WithCancel(ctx) - stream := &mockTransactionStatusesClientStream{ + stream := &mockClientStream[access.SendAndSubscribeTransactionStatusesResponse]{ ctx: ctx, responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionCCF), } @@ -2691,7 +2594,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { tx := transactions.New() ctx, cancel := context.WithCancel(ctx) - stream := &mockTransactionStatusesClientStream{ + stream := &mockClientStream[access.SendAndSubscribeTransactionStatusesResponse]{ ctx: ctx, responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionJSONCDC), } @@ -2722,7 +2625,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { ctx, cancel := context.WithCancel(ctx) - stream := &mockTransactionStatusesClientStream{ + stream := &mockClientStream[access.SendAndSubscribeTransactionStatusesResponse]{ ctx: ctx, err: status.Error(codes.Internal, "internal error"), } @@ -2736,7 +2639,7 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoTxResults(t, txResultChan, wg.Done) + go assertNoData(t, txResultChan, wg.Done, "transaction statutes") errorCount := 0 for e := range errCh { @@ -2753,36 +2656,6 @@ func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) { } -type mockTransactionStatusesClientStream struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*access.SendAndSubscribeTransactionStatusesResponse -} - -func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTransactionStatusesResponse, error) { - if m.err != nil { - return nil, m.err - } - - if m.offset >= len(m.responses) { - <-m.ctx.Done() - return nil, io.EOF - } - defer func() { m.offset++ }() - - return m.responses[m.offset], nil -} - -func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) { - defer done() - for range txResultChan { - require.FailNow(t, "should not receive txStatus") - } -} - func TestClient_SubscribeBlockHeaders(t *testing.T) { blockHeaders := test.BlockHeaderGenerator() @@ -2806,7 +2679,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + stream := &mockClientStream[access.SubscribeBlockHeadersResponse]{ ctx: ctx, responses: generateBlockHeaderResponses(responseCount), } @@ -2837,7 +2710,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + stream := &mockClientStream[access.SubscribeBlockHeadersResponse]{ ctx: ctx, responses: generateBlockHeaderResponses(responseCount), } @@ -2869,7 +2742,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + stream := &mockClientStream[access.SubscribeBlockHeadersResponse]{ ctx: ctx, responses: generateBlockHeaderResponses(responseCount), } @@ -2898,7 +2771,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{ + stream := &mockClientStream[access.SubscribeBlockHeadersResponse]{ ctx: ctx, err: status.Error(codes.Internal, "internal error"), } @@ -2912,7 +2785,7 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoBlockHeaders(t, blockHeadersCh, wg.Done) + go assertNoData(t, blockHeadersCh, wg.Done, "block headers") errorCount := 0 for e := range errCh { @@ -2928,36 +2801,6 @@ func TestClient_SubscribeBlockHeaders(t *testing.T) { })) } -type mockBlockHeaderClientStream[SubscribeBlockHeadersResponse any] struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*SubscribeBlockHeadersResponse -} - -func (s *mockBlockHeaderClientStream[SubscribeBlockHeadersResponse]) Recv() (*SubscribeBlockHeadersResponse, error) { - if s.err != nil { - return nil, s.err - } - - if s.offset >= len(s.responses) { - <-s.ctx.Done() - return nil, io.EOF - } - defer func() { s.offset++ }() - - return s.responses[s.offset], nil -} - -func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan BlockHeader, done func()) { - defer done() - for range blockHeadersChan { - require.FailNow(t, "should not receive block headers") - } -} - func TestClient_SubscribeBlockDigest(t *testing.T) { blockHeaders := test.BlockHeaderGenerator() @@ -2984,7 +2827,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + stream := &mockClientStream[access.SubscribeBlockDigestsResponse]{ ctx: ctx, responses: generateBlockDigestResponses(responseCount), } @@ -3002,7 +2845,8 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { for i := uint64(0); i < responseCount; i++ { actualDigest := <-blockDigestsCh - expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + expectedDigest, err := convert.MessageToBlockDigest(stream.responses[i]) + require.NoError(t, err) require.Equal(t, expectedDigest, actualDigest) } cancel() @@ -3014,7 +2858,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + stream := &mockClientStream[access.SubscribeBlockDigestsResponse]{ ctx: ctx, responses: generateBlockDigestResponses(responseCount), } @@ -3033,7 +2877,8 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { for i := uint64(0); i < responseCount; i++ { actualDigest := <-blockDigestsCh - expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + expectedDigest, err := convert.MessageToBlockDigest(stream.responses[i]) + require.NoError(t, err) require.Equal(t, expectedDigest, actualDigest) } cancel() @@ -3045,7 +2890,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { responseCount := uint64(100) ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + stream := &mockClientStream[access.SubscribeBlockDigestsResponse]{ ctx: ctx, responses: generateBlockDigestResponses(responseCount), } @@ -3063,7 +2908,8 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { for i := uint64(0); i < responseCount; i++ { actualDigest := <-blockDigestsCh - expectedDigest := convert.MessageToBlockDigest(stream.responses[i]) + expectedDigest, err := convert.MessageToBlockDigest(stream.responses[i]) + require.NoError(t, err) require.Equal(t, expectedDigest, actualDigest) } cancel() @@ -3073,7 +2919,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) { ctx, cancel := context.WithCancel(ctx) - stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{ + stream := &mockClientStream[access.SubscribeBlockDigestsResponse]{ ctx: ctx, err: status.Error(codes.Internal, "internal error"), } @@ -3087,7 +2933,7 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) - go assertNoBlockDigests(t, blockDigestsCh, wg.Done) + go assertNoData(t, blockDigestsCh, wg.Done, "block digests") errorCount := 0 for e := range errCh { @@ -3103,55 +2949,39 @@ func TestClient_SubscribeBlockDigest(t *testing.T) { })) } -type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct { - grpc.ClientStream - - ctx context.Context - err error - offset int - responses []*SubscribeBlockDigestsResponse -} - -func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) { - if s.err != nil { - return nil, s.err - } - - if s.offset >= len(s.responses) { - <-s.ctx.Done() - return nil, io.EOF +func assertNoErrors(t *testing.T, errCh <-chan error, done func()) { + defer done() + for err := range errCh { + require.NoError(t, err) } - defer func() { s.offset++ }() - - return s.responses[s.offset], nil } -func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) { +func assertNoData[T any](t *testing.T, dataCh <-chan T, done func(), topicNameForError string) { defer done() - for range blockDigestsChan { - require.FailNow(t, "should not receive block digests") + for range dataCh { + t.Fatalf("should not receive %s", topicNameForError) } } -type mockAccountStatutesClientStream struct { +type mockClientStream[Response any] struct { grpc.ClientStream ctx context.Context err error offset int - responses []*executiondata.SubscribeAccountStatusesResponse + responses []*Response } -func (m *mockAccountStatutesClientStream) Recv() (*executiondata.SubscribeAccountStatusesResponse, error) { - if m.err != nil { - return nil, m.err +func (s *mockClientStream[Response]) Recv() (*Response, error) { + if s.err != nil { + return nil, s.err } - if m.offset >= len(m.responses) { - <-m.ctx.Done() + if s.offset >= len(s.responses) { + <-s.ctx.Done() return nil, io.EOF } - defer func() { m.offset++ }() + defer func() { s.offset++ }() - return m.responses[m.offset], nil + return s.responses[s.offset], nil } From 5ba2321801ab95108a4face14aef9c2db7389e68 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Tue, 22 Oct 2024 17:07:09 +0300 Subject: [PATCH 2/3] replace topic for errors with obj type name --- access/grpc/grpc.go | 46 +++++++++++++++++---------------------------- examples/go.mod | 2 +- examples/go.sum | 1 + 3 files changed, 19 insertions(+), 30 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index ef01017c7..3c0e2a8b1 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "io" + "reflect" "github.com/onflow/flow/protobuf/go/flow/entities" "google.golang.org/grpc" @@ -1191,9 +1192,8 @@ func (c *BaseClient) SubscribeBlocksFromStartBlockID( convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } - blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") - return blocksChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockResponse) } func (c *BaseClient) SubscribeBlocksFromStartHeight( @@ -1220,9 +1220,8 @@ func (c *BaseClient) SubscribeBlocksFromStartHeight( convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } - blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") - return blocksChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockResponse) } func (c *BaseClient) SubscribeBlocksFromLatest( @@ -1247,9 +1246,8 @@ func (c *BaseClient) SubscribeBlocksFromLatest( convertBlockResponse := func(response *access.SubscribeBlocksResponse) (flow.Block, error) { return convert.MessageToBlock(response.GetBlock()) } - blocksChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockResponse, "block") - return blocksChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockResponse) } func (c *BaseClient) SendAndSubscribeTransactionStatuses( @@ -1347,9 +1345,8 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID( convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } - blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") - return blockHeaderChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse) } func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( @@ -1376,9 +1373,8 @@ func (c *BaseClient) SubscribeBlockHeadersFromStartHeight( convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } - blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") - return blockHeaderChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse) } func (c *BaseClient) SubscribeBlockHeadersFromLatest( @@ -1403,9 +1399,8 @@ func (c *BaseClient) SubscribeBlockHeadersFromLatest( convertBlockHeaderResponse := func(response *access.SubscribeBlockHeadersResponse) (flow.BlockHeader, error) { return convert.MessageToBlockHeader(response.GetHeader()) } - blockHeaderChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse, "block header") - return blockHeaderChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockHeaderResponse) } func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( @@ -1431,9 +1426,8 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { return convert.MessageToAccountStatus(response) } - accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") - return accountStatutesChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( @@ -1459,9 +1453,8 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { return convert.MessageToAccountStatus(response) } - accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") - return accountStatutesChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( @@ -1485,9 +1478,8 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( convertAccountStatusResponse := func(response *executiondata.SubscribeAccountStatusesResponse) (flow.AccountStatus, error) { return convert.MessageToAccountStatus(response) } - accountStatutesChan, errChan := subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse, "account status") - return accountStatutesChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( @@ -1514,9 +1506,8 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } - blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blockDigestChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse) } func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( @@ -1543,9 +1534,8 @@ func (c *BaseClient) SubscribeBlockDigestsFromStartHeight( convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } - blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blockDigestChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse) } func (c *BaseClient) SubscribeBlockDigestsFromLatest( @@ -1570,17 +1560,15 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( convertBlockDigestResponse := func(response *access.SubscribeBlockDigestsResponse) (flow.BlockDigest, error) { return convert.MessageToBlockDigest(response) } - blockDigestChan, errChan := subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse, "block digest") - return blockDigestChan, errChan, nil + return subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse) } func subscribe[Response any, ClientResponse any]( ctx context.Context, receive func() (*ClientResponse, error), convertResponse func(*ClientResponse) (Response, error), - topicNameForErrors string, -) (chan Response, chan error) { +) (<-chan Response, <-chan error, error) { subChan := make(chan Response) errChan := make(chan error) @@ -1602,13 +1590,13 @@ func subscribe[Response any, ClientResponse any]( return } - sendErr(fmt.Errorf("error receiving %s: %w", topicNameForErrors, err)) + sendErr(fmt.Errorf("error receiving %s: %w", reflect.TypeOf(resp).Name(), err)) return } response, err := convertResponse(resp) if err != nil { - sendErr(fmt.Errorf("error converting %s: %w", topicNameForErrors, err)) + sendErr(fmt.Errorf("error converting %s: %w", reflect.TypeOf(resp).Name(), err)) return } @@ -1620,5 +1608,5 @@ func subscribe[Response any, ClientResponse any]( } }() - return subChan, errChan + return subChan, errChan, nil } diff --git a/examples/go.mod b/examples/go.mod index c58c5e374..25d222804 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -7,7 +7,7 @@ toolchain go1.22.4 replace github.com/onflow/flow-go-sdk => ../ require ( - github.com/onflow/cadence v1.0.1-0.20241018173327-2e72919b18ac + github.com/onflow/cadence v1.2.1 github.com/onflow/flow-cli/flowkit v1.11.0 github.com/onflow/flow-go-sdk v0.41.17 github.com/spf13/afero v1.11.0 diff --git a/examples/go.sum b/examples/go.sum index 6ef27de99..e80f1bdfe 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -120,6 +120,7 @@ github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmp github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= github.com/onflow/cadence v1.0.0/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU= github.com/onflow/cadence v1.0.1-0.20241018173327-2e72919b18ac/go.mod h1:fJxxOAp1wnWDfOHT8GOc1ypsU0RR5E3z51AhG8Yf5jg= +github.com/onflow/cadence v1.2.1/go.mod h1:fJxxOAp1wnWDfOHT8GOc1ypsU0RR5E3z51AhG8Yf5jg= github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg= github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI= From 92a96e3f15ae007ba7eb4ffd1c85d4fb9edbb750 Mon Sep 17 00:00:00 2001 From: Illia Malachyn Date: Tue, 22 Oct 2024 18:23:40 +0300 Subject: [PATCH 3/3] introduce a new subscribe function for ordered responses --- access/grpc/grpc.go | 78 +++++++++++++++++++++++++++++++++++++++++++-- account.go | 4 +++ 2 files changed, 79 insertions(+), 3 deletions(-) diff --git a/access/grpc/grpc.go b/access/grpc/grpc.go index 3c0e2a8b1..88b3fdceb 100644 --- a/access/grpc/grpc.go +++ b/access/grpc/grpc.go @@ -1427,7 +1427,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartHeight( return convert.MessageToAccountStatus(response) } - return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) + return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( @@ -1454,7 +1454,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromStartBlockID( return convert.MessageToAccountStatus(response) } - return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) + return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( @@ -1479,7 +1479,7 @@ func (c *BaseClient) SubscribeAccountStatusesFromLatestBlock( return convert.MessageToAccountStatus(response) } - return subscribe(ctx, subscribeClient.Recv, convertAccountStatusResponse) + return subscribeContinuouslyIndexed(ctx, subscribeClient.Recv, convertAccountStatusResponse) } func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID( @@ -1564,6 +1564,13 @@ func (c *BaseClient) SubscribeBlockDigestsFromLatest( return subscribe(ctx, subscribeClient.Recv, convertBlockDigestResponse) } +// subscribe sets up a generic subscription that continuously receives and processes messages +// from a data source. It does not enforce any message ordering or indexing. The function takes +// three parameters: a receive() function for getting the next message, a convertResponse() function +// for transforming the message into the desired response type, and a context for cancellation. +// It returns two channels: one for the converted responses and another for errors. The function +// runs in a separate goroutine and handles errors gracefully, signaling completion when the +// context is canceled or an error occurs. func subscribe[Response any, ClientResponse any]( ctx context.Context, receive func() (*ClientResponse, error), @@ -1610,3 +1617,68 @@ func subscribe[Response any, ClientResponse any]( return subChan, errChan, nil } + +type IndexedMessage interface { + GetMessageIndex() uint64 +} + +// subscribeContinuouslyIndexed is a specialized version of the subscription function for cases +// where messages contain an index to ensure order. The Response type must implement the +// IndexedMessage interface, providing a GetMessageIndex method. The function checks that each +// received message's index matches the expected sequence, starting from zero and incrementing +// by one. If a message arrives out of order, an error is sent. This function helps clients +// detect any missed messages and ensures consistent message processing. +func subscribeContinuouslyIndexed[Response IndexedMessage, ClientResponse any]( + ctx context.Context, + receive func() (*ClientResponse, error), + convertResponse func(*ClientResponse) (Response, error), +) (<-chan Response, <-chan error, error) { + subChan := make(chan Response) + errChan := make(chan error) + + sendErr := func(err error) { + select { + case <-ctx.Done(): + case errChan <- err: + } + } + + go func() { + defer close(subChan) + defer close(errChan) + + var nextExpectedMessageIndex uint64 + + for { + resp, err := receive() + if err != nil { + if err == io.EOF { + return + } + + sendErr(fmt.Errorf("error receiving %s: %w", reflect.TypeOf(resp).Name(), err)) + return + } + + response, err := convertResponse(resp) + if err != nil { + sendErr(fmt.Errorf("error converting %s: %w", reflect.TypeOf(resp).Name(), err)) + return + } + + if response.GetMessageIndex() != nextExpectedMessageIndex { + sendErr(fmt.Errorf("message received out of order")) + return + } + nextExpectedMessageIndex += 1 + + select { + case <-ctx.Done(): + return + case subChan <- response: + } + } + }() + + return subChan, errChan, nil +} diff --git a/account.go b/account.go index a2baee8cd..f91e9045b 100644 --- a/account.go +++ b/account.go @@ -166,6 +166,10 @@ type AccountStatus struct { Results []*AccountStatusResult } +func (a AccountStatus) GetMessageIndex() uint64 { + return a.MessageIndex +} + type AccountStatusResult struct { Address Address Events []Event