Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subscribe account statuses endpoint #762

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into illia-malachyn/747-subscribe-account-statu…
…tes-endpoint
illia-malachyn committed Oct 15, 2024
commit bbc52407cf0d079c0e9bc98b2c359b89e26b6fc9
139 changes: 139 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -1378,6 +1378,145 @@ func receiveBlocksFromClient[Client interface {
}
}

func (c *BaseClient) SubscribeBlockHeadersFromStartBlockID(
ctx context.Context,
startBlockID flow.Identifier,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromStartBlockIDRequest{
StartBlockId: startBlockID.Bytes(),
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartBlockID(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockHeadersFromStartHeight(
ctx context.Context,
startHeight uint64,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromStartHeightRequest{
StartBlockHeight: startHeight,
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromStartHeight(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func (c *BaseClient) SubscribeBlockHeadersFromLatest(
ctx context.Context,
blockStatus flow.BlockStatus,
opts ...grpc.CallOption,
) (<-chan flow.BlockHeader, <-chan error, error) {
status := convert.BlockStatusToEntity(blockStatus)
if status == entities.BlockStatus_BLOCK_UNKNOWN {
return nil, nil, newRPCError(errors.New("unknown block status"))
}

request := &access.SubscribeBlockHeadersFromLatestRequest{
BlockStatus: status,
}

subscribeClient, err := c.rpcClient.SubscribeBlockHeadersFromLatest(ctx, request, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

blockHeaderChan := make(chan flow.BlockHeader)
errChan := make(chan error)

go func() {
defer close(blockHeaderChan)
defer close(errChan)
receiveBlockHeadersFromClient(ctx, subscribeClient, blockHeaderChan, errChan)
}()

return blockHeaderChan, errChan, nil
}

func receiveBlockHeadersFromClient[Client interface {
Recv() (*access.SubscribeBlockHeadersResponse, error)
}](
ctx context.Context,
client Client,
blockHeadersChan chan<- flow.BlockHeader,
errChan chan<- error,
) {
sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

for {
// Receive the next blockHeader response
blockHeaderResponse, err := client.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}

sendErr(fmt.Errorf("error receiving blockHeader: %w", err))
return
}

blockHeader, err := convert.MessageToBlockHeader(blockHeaderResponse.GetHeader())
if err != nil {
sendErr(fmt.Errorf("error converting message to block header: %w", err))
return
}

select {
case <-ctx.Done():
return
case blockHeadersChan <- blockHeader:
}
}
}

func (c *BaseClient) SubscribeAccountStatusesFromStartHeight(
ctx context.Context,
startHeight uint64,
175 changes: 175 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
@@ -2783,6 +2783,181 @@ func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus,
}
}

func TestClient_SubscribeBlockHeaders(t *testing.T) {
blockHeaders := test.BlockHeaderGenerator()

generateBlockHeaderResponses := func(count uint64) []*access.SubscribeBlockHeadersResponse {
var resBlockHeaders []*access.SubscribeBlockHeadersResponse

for i := uint64(0); i < count; i++ {
header, err := convert.BlockHeaderToMessage(blockHeaders.New())
require.NoError(t, err)

resBlockHeaders = append(resBlockHeaders, &access.SubscribeBlockHeadersResponse{
Header: header,
})
}

return resBlockHeaders
}

t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
startHeight := uint64(1)
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{
ctx: ctx,
responses: generateBlockHeaderResponses(responseCount),
}

rpc.
On("SubscribeBlockHeadersFromStartHeight", ctx, mock.Anything).
Return(stream, nil)

blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartHeight(ctx, startHeight, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualHeader := <-blockHeadersCh
expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader())
require.NoError(t, err)
require.Equal(t, expectedHeader, actualHeader)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{
ctx: ctx,
responses: generateBlockHeaderResponses(responseCount),
}

rpc.
On("SubscribeBlockHeadersFromStartBlockID", ctx, mock.Anything).
Return(stream, nil)

startBlockID := convert.MessageToIdentifier(stream.responses[0].GetHeader().Id)
blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromStartBlockID(ctx, startBlockID, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualHeader := <-blockHeadersCh
expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader())
require.NoError(t, err)
require.Equal(t, expectedHeader, actualHeader)
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)

ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{
ctx: ctx,
responses: generateBlockHeaderResponses(responseCount),
}

rpc.
On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

for i := uint64(0); i < responseCount; i++ {
actualHeader := <-blockHeadersCh
expectedHeader, err := convert.MessageToBlockHeader(stream.responses[i].GetHeader())
require.NoError(t, err)
require.Equal(t, expectedHeader, actualHeader)
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
stream := &mockBlockHeaderClientStream[access.SubscribeBlockHeadersResponse]{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SubscribeBlockHeadersFromLatest", ctx, mock.Anything).
Return(stream, nil)

blockHeadersCh, errCh, err := c.SubscribeBlockHeadersFromLatest(ctx, flow.BlockStatusFinalized)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoBlockHeaders(t, blockHeadersCh, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
cancel()

require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))
}

type mockBlockHeaderClientStream[SubscribeBlockHeadersResponse any] struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*SubscribeBlockHeadersResponse
}

func (s *mockBlockHeaderClientStream[SubscribeBlockHeadersResponse]) Recv() (*SubscribeBlockHeadersResponse, error) {
if s.err != nil {
return nil, s.err
}

if s.offset >= len(s.responses) {
<-s.ctx.Done()
return nil, io.EOF
}
defer func() { s.offset++ }()

return s.responses[s.offset], nil
}

func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan BlockHeader, done func()) {
defer done()
for range blockHeadersChan {
require.FailNow(t, "should not receive block headers")
}
}

type mockAccountStatutesClientStream struct {
grpc.ClientStream

You are viewing a condensed version of this merge commit. You can view the full changes here.