Skip to content

Commit

Permalink
Merge pull request #772 from The-K-R-O-K/send-and-subscribe-tr5ansact…
Browse files Browse the repository at this point in the history
…ion-statuses-endpoint

Send and subscribe transaction statuses endpoint
  • Loading branch information
peterargue authored Oct 3, 2024
2 parents bd1d074 + 0a99ee4 commit 16ec51b
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 1 deletion.
7 changes: 7 additions & 0 deletions access/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (c *Client) GetTransactionResultsByBlockID(ctx context.Context, blockID flo
return c.grpc.GetTransactionResultsByBlockID(ctx, blockID)
}

func (c *Client) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
) (<-chan flow.TransactionResult, <-chan error, error) {
return c.grpc.SendAndSubscribeTransactionStatuses(ctx, tx)
}

func (c *Client) GetAccount(ctx context.Context, address flow.Address) (*flow.Account, error) {
return c.grpc.GetAccount(ctx, address)
}
Expand Down
72 changes: 72 additions & 0 deletions access/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,78 @@ func (c *BaseClient) SubscribeBlocksFromLatest(
return blocksChan, errChan, nil
}

func (c *BaseClient) SendAndSubscribeTransactionStatuses(
ctx context.Context,
tx flow.Transaction,
opts ...grpc.CallOption,
) (<-chan flow.TransactionResult, <-chan error, error) {
txMsg, err := convert.TransactionToMessage(tx)
if err != nil {
return nil, nil, newEntityToMessageError(entityTransaction, err)
}

req := &access.SendAndSubscribeTransactionStatusesRequest{
Transaction: txMsg,
EventEncodingVersion: c.eventEncoding,
}

subscribeClient, err := c.rpcClient.SendAndSubscribeTransactionStatuses(ctx, req, opts...)
if err != nil {
return nil, nil, newRPCError(err)
}

txStatusChan := make(chan flow.TransactionResult)
errChan := make(chan error)

sendErr := func(err error) {
select {
case <-ctx.Done():
case errChan <- err:
}
}

go func() {
defer close(txStatusChan)
defer close(errChan)

messageIndex := uint64(0)

for {
// Receive the next txResult response
txResultsResponse, err := subscribeClient.Recv()
if err != nil {
if err == io.EOF {
// End of stream, return gracefully
return
}
sendErr(fmt.Errorf("error receiving transaction result: %w", err))
return
}

if messageIndex != txResultsResponse.GetMessageIndex() {
sendErr(fmt.Errorf("tx result response was lost"))
return
}

txResult, err := convert.MessageToTransactionResult(txResultsResponse.GetTransactionResults(), c.jsonOptions)
if err != nil {
sendErr(fmt.Errorf("error converting transaction result: %w", err))
return
}

messageIndex++

select {
case <-ctx.Done():
return
case txStatusChan <- txResult:
}
}
}()

return txStatusChan, errChan, nil
}

func receiveBlocksFromClient[Client interface {
Recv() (*access.SubscribeBlocksResponse, error)
}](
Expand Down
154 changes: 154 additions & 0 deletions access/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2381,3 +2381,157 @@ func assertNoBlocks[T any](t *testing.T, blocksCh <-chan T, done func()) {
require.FailNow(t, "should not receive blocks")
}
}

func TestClient_SendAndSubscribeTransactionStatuses(t *testing.T) {
transactions := test.TransactionGenerator()

generateTransactionStatusResponses := func(count uint64, encodingVersion flow.EventEncodingVersion) []*access.SendAndSubscribeTransactionStatusesResponse {
var resTransactionResults []*access.SendAndSubscribeTransactionStatusesResponse
results := test.TransactionResultGenerator(encodingVersion)

for i := uint64(0); i < count; i++ {
expectedResult := results.New()
transactionResult, _ := convert.TransactionResultToMessage(expectedResult, encodingVersion)

response := &access.SendAndSubscribeTransactionStatusesResponse{
TransactionResults: transactionResult,
MessageIndex: i,
}

resTransactionResults = append(resTransactionResults, response)
}

return resTransactionResults
}

t.Run("Happy Path - CCF", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)
tx := transactions.New()

ctx, cancel := context.WithCancel(ctx)
stream := &mockTransactionStatusesClientStream{
ctx: ctx,
responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionCCF),
}

rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil)

txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

expectedCounter := uint64(0)

for i := uint64(0); i < responseCount; i++ {
actualTxResult := <-txResultCh
expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions)
require.NoError(t, err)
require.Equal(t, expectedTxResult, actualTxResult)
require.Equal(t, expectedCounter, stream.responses[i].MessageIndex)

expectedCounter++
}
cancel()

wg.Wait()
}))

t.Run("Happy Path - JSON-CDC", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
responseCount := uint64(100)
tx := transactions.New()

ctx, cancel := context.WithCancel(ctx)
stream := &mockTransactionStatusesClientStream{
ctx: ctx,
responses: generateTransactionStatusResponses(responseCount, flow.EventEncodingVersionJSONCDC),
}

rpc.On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).Return(stream, nil)

txResultCh, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, *tx)
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoErrors(t, errCh, wg.Done)

expectedCounter := uint64(0)
for i := uint64(0); i < responseCount; i++ {
actualTxResult := <-txResultCh
expectedTxResult, err := convert.MessageToTransactionResult(stream.responses[i].GetTransactionResults(), DefaultClientOptions().jsonOptions)
require.NoError(t, err)
require.Equal(t, expectedTxResult, actualTxResult)
require.Equal(t, expectedCounter, stream.responses[i].MessageIndex)

expectedCounter++
}
cancel()

wg.Wait()
}))

t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
ctx, cancel := context.WithCancel(ctx)
stream := &mockTransactionStatusesClientStream{
ctx: ctx,
err: status.Error(codes.Internal, "internal error"),
}

rpc.
On("SendAndSubscribeTransactionStatuses", ctx, mock.Anything).
Return(stream, nil)

txResultChan, errCh, err := c.SendAndSubscribeTransactionStatuses(ctx, flow.Transaction{})
require.NoError(t, err)

wg := sync.WaitGroup{}
wg.Add(1)
go assertNoTxResults(t, txResultChan, wg.Done)

errorCount := 0
for e := range errCh {
require.Error(t, e)
require.ErrorIs(t, e, stream.err)
errorCount += 1
}
cancel()

require.Equalf(t, 1, errorCount, "only 1 error is expected")

wg.Wait()
}))

}

type mockTransactionStatusesClientStream struct {
grpc.ClientStream

ctx context.Context
err error
offset int
responses []*access.SendAndSubscribeTransactionStatusesResponse
}

func (m *mockTransactionStatusesClientStream) Recv() (*access.SendAndSubscribeTransactionStatusesResponse, error) {
if m.err != nil {
return nil, m.err
}

if m.offset >= len(m.responses) {
<-m.ctx.Done()
return nil, io.EOF
}
defer func() { m.offset++ }()

return m.responses[m.offset], nil
}

func assertNoTxResults[TxStatus any](t *testing.T, txResultChan <-chan TxStatus, done func()) {
defer done()
for range txResultChan {
require.FailNow(t, "should not receive txStatus")
}
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.4
replace github.com/onflow/flow-go-sdk => ../

require (
github.com/onflow/cadence v1.0.0-preview.52
github.com/onflow/cadence v1.0.0
github.com/onflow/flow-cli/flowkit v1.11.0
github.com/onflow/flow-go-sdk v0.41.17
github.com/spf13/afero v1.11.0
Expand Down
1 change: 1 addition & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ github.com/onflow/cadence v1.0.0-preview.35/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmp
github.com/onflow/cadence v1.0.0-preview.36/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.38/go.mod h1:jOwvPSSLTr9TvaKMs7KKiBYMmpdpNNAFxBsjMlrqVD0=
github.com/onflow/cadence v1.0.0-preview.52/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU=
github.com/onflow/cadence v1.0.0/go.mod h1:7wvvecnAZtYOspLOS3Lh+FuAmMeSrXhAWiycC3kQ1UU=
github.com/onflow/crypto v0.25.0 h1:BeWbLsh3ZD13Ej+Uky6kg1PL1ZIVBDVX+2MVBNwqddg=
github.com/onflow/crypto v0.25.0/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
github.com/onflow/crypto v0.25.1/go.mod h1:C8FbaX0x8y+FxWjbkHy0Q4EASCDR9bSPWZqlpCLYyVI=
Expand Down

0 comments on commit 16ec51b

Please sign in to comment.