diff --git a/access/api.go b/access/api.go index a65c35ac752..f5c7701c5bf 100644 --- a/access/api.go +++ b/access/api.go @@ -70,7 +70,7 @@ func TransactionResultToMessage(result *TransactionResult) *access.TransactionRe BlockId: result.BlockID[:], TransactionId: result.TransactionID[:], CollectionId: result.CollectionID[:], - BlockHeight: uint64(result.BlockHeight), + BlockHeight: result.BlockHeight, } } diff --git a/access/handler.go b/access/handler.go index 914fd2a805d..25bfa21295e 100644 --- a/access/handler.go +++ b/access/handler.go @@ -12,22 +12,30 @@ import ( "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/consensus/hotstuff/signature" "github.com/onflow/flow-go/engine/common/rpc/convert" + synceng "github.com/onflow/flow-go/engine/common/synchronization" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" ) type Handler struct { api API chain flow.Chain signerIndicesDecoder hotstuff.BlockSignerDecoder + finalizedHeaderCache *synceng.FinalizedHeaderCache + me module.Local } // HandlerOption is used to hand over optional constructor parameters type HandlerOption func(*Handler) -func NewHandler(api API, chain flow.Chain, options ...HandlerOption) *Handler { +var _ access.AccessAPIServer = (*Handler)(nil) + +func NewHandler(api API, chain flow.Chain, finalizedHeader *synceng.FinalizedHeaderCache, me module.Local, options ...HandlerOption) *Handler { h := &Handler{ api: api, chain: chain, + finalizedHeaderCache: finalizedHeader, + me: me, signerIndicesDecoder: &signature.NoopBlockSignerDecoder{}, } for _, opt := range options { @@ -142,6 +150,8 @@ func (h *Handler) GetCollectionByID( ctx context.Context, req *access.GetCollectionByIDRequest, ) (*access.CollectionResponse, error) { + metadata := h.buildMetadataResponse() + id, err := convert.CollectionID(req.GetId()) if err != nil { return nil, err @@ -159,6 +169,7 @@ func (h *Handler) GetCollectionByID( return &access.CollectionResponse{ Collection: colMsg, + Metadata: metadata, }, nil } @@ -167,6 +178,8 @@ func (h *Handler) SendTransaction( ctx context.Context, req *access.SendTransactionRequest, ) (*access.SendTransactionResponse, error) { + metadata := h.buildMetadataResponse() + txMsg := req.GetTransaction() tx, err := convert.MessageToTransaction(txMsg, h.chain) @@ -182,7 +195,8 @@ func (h *Handler) SendTransaction( txID := tx.ID() return &access.SendTransactionResponse{ - Id: txID[:], + Id: txID[:], + Metadata: metadata, }, nil } @@ -191,6 +205,8 @@ func (h *Handler) GetTransaction( ctx context.Context, req *access.GetTransactionRequest, ) (*access.TransactionResponse, error) { + metadata := h.buildMetadataResponse() + id, err := convert.TransactionID(req.GetId()) if err != nil { return nil, err @@ -203,6 +219,7 @@ func (h *Handler) GetTransaction( return &access.TransactionResponse{ Transaction: convert.TransactionToMessage(*tx), + Metadata: metadata, }, nil } @@ -211,6 +228,8 @@ func (h *Handler) GetTransactionResult( ctx context.Context, req *access.GetTransactionRequest, ) (*access.TransactionResultResponse, error) { + metadata := h.buildMetadataResponse() + id, err := convert.TransactionID(req.GetId()) if err != nil { return nil, err @@ -221,13 +240,18 @@ func (h *Handler) GetTransactionResult( return nil, err } - return TransactionResultToMessage(result), nil + message := TransactionResultToMessage(result) + message.Metadata = metadata + + return message, nil } func (h *Handler) GetTransactionResultsByBlockID( ctx context.Context, req *access.GetTransactionsByBlockIDRequest, ) (*access.TransactionResultsResponse, error) { + metadata := h.buildMetadataResponse() + id, err := convert.BlockID(req.GetBlockId()) if err != nil { return nil, err @@ -238,13 +262,18 @@ func (h *Handler) GetTransactionResultsByBlockID( return nil, err } - return TransactionResultsToMessage(results), nil + message := TransactionResultsToMessage(results) + message.Metadata = metadata + + return message, nil } func (h *Handler) GetTransactionsByBlockID( ctx context.Context, req *access.GetTransactionsByBlockIDRequest, ) (*access.TransactionsResponse, error) { + metadata := h.buildMetadataResponse() + id, err := convert.BlockID(req.GetBlockId()) if err != nil { return nil, err @@ -257,6 +286,7 @@ func (h *Handler) GetTransactionsByBlockID( return &access.TransactionsResponse{ Transactions: convert.TransactionsToMessages(transactions), + Metadata: metadata, }, nil } @@ -266,6 +296,8 @@ func (h *Handler) GetTransactionResultByIndex( ctx context.Context, req *access.GetTransactionByIndexRequest, ) (*access.TransactionResultResponse, error) { + metadata := h.buildMetadataResponse() + blockID, err := convert.BlockID(req.GetBlockId()) if err != nil { return nil, err @@ -276,7 +308,10 @@ func (h *Handler) GetTransactionResultByIndex( return nil, err } - return TransactionResultToMessage(result), nil + message := TransactionResultToMessage(result) + message.Metadata = metadata + + return message, nil } // GetAccount returns an account by address at the latest sealed block. @@ -284,6 +319,8 @@ func (h *Handler) GetAccount( ctx context.Context, req *access.GetAccountRequest, ) (*access.GetAccountResponse, error) { + metadata := h.buildMetadataResponse() + address := flow.BytesToAddress(req.GetAddress()) account, err := h.api.GetAccount(ctx, address) @@ -297,7 +334,8 @@ func (h *Handler) GetAccount( } return &access.GetAccountResponse{ - Account: accountMsg, + Account: accountMsg, + Metadata: metadata, }, nil } @@ -306,6 +344,8 @@ func (h *Handler) GetAccountAtLatestBlock( ctx context.Context, req *access.GetAccountAtLatestBlockRequest, ) (*access.AccountResponse, error) { + metadata := h.buildMetadataResponse() + address, err := convert.Address(req.GetAddress(), h.chain) if err != nil { return nil, err @@ -322,7 +362,8 @@ func (h *Handler) GetAccountAtLatestBlock( } return &access.AccountResponse{ - Account: accountMsg, + Account: accountMsg, + Metadata: metadata, }, nil } @@ -330,6 +371,8 @@ func (h *Handler) GetAccountAtBlockHeight( ctx context.Context, req *access.GetAccountAtBlockHeightRequest, ) (*access.AccountResponse, error) { + metadata := h.buildMetadataResponse() + address, err := convert.Address(req.GetAddress(), h.chain) if err != nil { return nil, err @@ -346,7 +389,8 @@ func (h *Handler) GetAccountAtBlockHeight( } return &access.AccountResponse{ - Account: accountMsg, + Account: accountMsg, + Metadata: metadata, }, nil } @@ -355,6 +399,8 @@ func (h *Handler) ExecuteScriptAtLatestBlock( ctx context.Context, req *access.ExecuteScriptAtLatestBlockRequest, ) (*access.ExecuteScriptResponse, error) { + metadata := h.buildMetadataResponse() + script := req.GetScript() arguments := req.GetArguments() @@ -364,7 +410,8 @@ func (h *Handler) ExecuteScriptAtLatestBlock( } return &access.ExecuteScriptResponse{ - Value: value, + Value: value, + Metadata: metadata, }, nil } @@ -373,6 +420,8 @@ func (h *Handler) ExecuteScriptAtBlockHeight( ctx context.Context, req *access.ExecuteScriptAtBlockHeightRequest, ) (*access.ExecuteScriptResponse, error) { + metadata := h.buildMetadataResponse() + script := req.GetScript() arguments := req.GetArguments() blockHeight := req.GetBlockHeight() @@ -383,7 +432,8 @@ func (h *Handler) ExecuteScriptAtBlockHeight( } return &access.ExecuteScriptResponse{ - Value: value, + Value: value, + Metadata: metadata, }, nil } @@ -392,6 +442,8 @@ func (h *Handler) ExecuteScriptAtBlockID( ctx context.Context, req *access.ExecuteScriptAtBlockIDRequest, ) (*access.ExecuteScriptResponse, error) { + metadata := h.buildMetadataResponse() + script := req.GetScript() arguments := req.GetArguments() blockID := convert.MessageToIdentifier(req.GetBlockId()) @@ -402,7 +454,8 @@ func (h *Handler) ExecuteScriptAtBlockID( } return &access.ExecuteScriptResponse{ - Value: value, + Value: value, + Metadata: metadata, }, nil } @@ -411,6 +464,8 @@ func (h *Handler) GetEventsForHeightRange( ctx context.Context, req *access.GetEventsForHeightRangeRequest, ) (*access.EventsResponse, error) { + metadata := h.buildMetadataResponse() + eventType, err := convert.EventType(req.GetType()) if err != nil { return nil, err @@ -429,7 +484,8 @@ func (h *Handler) GetEventsForHeightRange( return nil, err } return &access.EventsResponse{ - Results: resultEvents, + Results: resultEvents, + Metadata: metadata, }, nil } @@ -438,6 +494,8 @@ func (h *Handler) GetEventsForBlockIDs( ctx context.Context, req *access.GetEventsForBlockIDsRequest, ) (*access.EventsResponse, error) { + metadata := h.buildMetadataResponse() + eventType, err := convert.EventType(req.GetType()) if err != nil { return nil, err @@ -459,12 +517,15 @@ func (h *Handler) GetEventsForBlockIDs( } return &access.EventsResponse{ - Results: resultEvents, + Results: resultEvents, + Metadata: metadata, }, nil } // GetLatestProtocolStateSnapshot returns the latest serializable Snapshot func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *access.GetLatestProtocolStateSnapshotRequest) (*access.ProtocolStateSnapshotResponse, error) { + metadata := h.buildMetadataResponse() + snapshot, err := h.api.GetLatestProtocolStateSnapshot(ctx) if err != nil { return nil, err @@ -472,6 +533,7 @@ func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *acces return &access.ProtocolStateSnapshotResponse{ SerializedSnapshot: snapshot, + Metadata: metadata, }, nil } @@ -479,6 +541,8 @@ func (h *Handler) GetLatestProtocolStateSnapshot(ctx context.Context, req *acces // AN might receive multiple receipts with conflicting results for unsealed blocks. // If this case happens, since AN is not able to determine which result is the correct one until the block is sealed, it has to pick one result to respond to this query. For now, we return the result from the latest received receipt. func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.GetExecutionResultForBlockIDRequest) (*access.ExecutionResultForBlockIDResponse, error) { + metadata := h.buildMetadataResponse() + blockID := convert.MessageToIdentifier(req.GetBlockId()) result, err := h.api.GetExecutionResultForBlockID(ctx, blockID) @@ -486,10 +550,12 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access. return nil, err } - return executionResultToMessages(result) + return executionResultToMessages(result, metadata) } func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) { + metadata := h.buildMetadataResponse() + signerIDs, err := h.signerIndicesDecoder.DecodeSignerIDs(block.Header) if err != nil { return nil, err // the block was retrieved from local storage - so no errors are expected @@ -504,13 +570,17 @@ func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flo } else { msg = convert.BlockToMessageLight(block) } + return &access.BlockResponse{ Block: msg, BlockStatus: entities.BlockStatus(status), + Metadata: metadata, }, nil } func (h *Handler) blockHeaderResponse(header *flow.Header, status flow.BlockStatus) (*access.BlockHeaderResponse, error) { + metadata := h.buildMetadataResponse() + signerIDs, err := h.signerIndicesDecoder.DecodeSignerIDs(header) if err != nil { return nil, err // the block was retrieved from local storage - so no errors are expected @@ -524,15 +594,32 @@ func (h *Handler) blockHeaderResponse(header *flow.Header, status flow.BlockStat return &access.BlockHeaderResponse{ Block: msg, BlockStatus: entities.BlockStatus(status), + Metadata: metadata, }, nil } -func executionResultToMessages(er *flow.ExecutionResult) (*access.ExecutionResultForBlockIDResponse, error) { +// buildMetadataResponse builds and returns the metadata response object. +func (h *Handler) buildMetadataResponse() *entities.Metadata { + lastFinalizedHeader := h.finalizedHeaderCache.Get() + blockId := lastFinalizedHeader.ID() + nodeId := h.me.NodeID() + + return &entities.Metadata{ + LatestFinalizedBlockId: blockId[:], + LatestFinalizedHeight: lastFinalizedHeader.Height, + NodeId: nodeId[:], + } +} + +func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Metadata) (*access.ExecutionResultForBlockIDResponse, error) { execResult, err := convert.ExecutionResultToMessage(er) if err != nil { return nil, err } - return &access.ExecutionResultForBlockIDResponse{ExecutionResult: execResult}, nil + return &access.ExecutionResultForBlockIDResponse{ + ExecutionResult: execResult, + Metadata: metadata, + }, nil } func blockEventsToMessages(blocks []flow.BlockEvents) ([]*access.EventsResponse_Result, error) { diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 1dfca6a258e..fd4b74699fa 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -971,6 +971,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.rpcMetricsEnabled, builder.apiRatelimits, builder.apiBurstlimits, + builder.Me, ) if err != nil { return nil, err @@ -979,6 +980,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.RpcEng, err = engineBuilder. WithLegacy(). WithBlockSignerDecoder(signature.NewBlockSignerDecoder(builder.Committee)). + WithFinalizedHeaderCache(builder.FinalizedHeader). Build() if err != nil { return nil, err diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 472ae398260..295c76a26a2 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -1038,6 +1038,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.rpcMetricsEnabled, builder.apiRatelimits, builder.apiBurstlimits, + builder.Me, ) if err != nil { return nil, err diff --git a/engine/access/access_test.go b/engine/access/access_test.go index a42953489ba..a2af4f64481 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "testing" + "time" "github.com/dgraph-io/badger/v2" "github.com/google/go-cmp/cmp" @@ -21,6 +22,7 @@ import ( "github.com/onflow/flow-go/access" hsmock "github.com/onflow/flow-go/consensus/hotstuff/mocks" "github.com/onflow/flow-go/consensus/hotstuff/model" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/engine/access/ingestion" accessmock "github.com/onflow/flow-go/engine/access/mock" @@ -28,6 +30,7 @@ import ( "github.com/onflow/flow-go/engine/access/rpc/backend" factorymock "github.com/onflow/flow-go/engine/access/rpc/backend/mock" "github.com/onflow/flow-go/engine/common/rpc/convert" + synceng "github.com/onflow/flow-go/engine/common/synchronization" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/factory" "github.com/onflow/flow-go/model/flow/filter" @@ -48,22 +51,27 @@ import ( type Suite struct { suite.Suite - state *protocol.State - snapshot *protocol.Snapshot - epochQuery *protocol.EpochQuery - params *protocol.Params - signerIndicesDecoder *hsmock.BlockSignerDecoder - signerIds flow.IdentifierList - log zerolog.Logger - net *mocknetwork.Network - request *module.Requester - collClient *accessmock.AccessAPIClient - execClient *accessmock.ExecutionAPIClient - me *module.Local - rootBlock *flow.Header - chainID flow.ChainID - metrics *metrics.NoopCollector - backend *backend.Backend + state *protocol.State + sealedSnapshot *protocol.Snapshot + finalSnapshot *protocol.Snapshot + epochQuery *protocol.EpochQuery + params *protocol.Params + signerIndicesDecoder *hsmock.BlockSignerDecoder + signerIds flow.IdentifierList + log zerolog.Logger + net *mocknetwork.Network + request *module.Requester + collClient *accessmock.AccessAPIClient + execClient *accessmock.ExecutionAPIClient + me *module.Local + rootBlock *flow.Header + sealedBlock *flow.Header + finalizedBlock *flow.Header + chainID flow.ChainID + metrics *metrics.NoopCollector + backend *backend.Backend + finalizationDistributor *pubsub.FinalizationDistributor + finalizedHeaderCache *synceng.FinalizedHeaderCache } // TestAccess tests scenarios which exercise multiple API calls using both the RPC handler and the ingest engine @@ -76,14 +84,30 @@ func (suite *Suite) SetupTest() { suite.log = zerolog.New(os.Stderr) suite.net = new(mocknetwork.Network) suite.state = new(protocol.State) - suite.snapshot = new(protocol.Snapshot) + suite.finalSnapshot = new(protocol.Snapshot) + suite.sealedSnapshot = new(protocol.Snapshot) + + suite.rootBlock = unittest.BlockHeaderFixture(unittest.WithHeaderHeight(0)) + suite.sealedBlock = suite.rootBlock + suite.finalizedBlock = unittest.BlockHeaderWithParentFixture(suite.sealedBlock) suite.epochQuery = new(protocol.EpochQuery) - suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() - suite.state.On("Final").Return(suite.snapshot, nil).Maybe() - suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe() + suite.state.On("Sealed").Return(suite.sealedSnapshot, nil).Maybe() + suite.state.On("Final").Return(suite.finalSnapshot, nil).Maybe() + suite.finalSnapshot.On("Epochs").Return(suite.epochQuery).Maybe() + suite.sealedSnapshot.On("Head").Return( + func() *flow.Header { + return suite.sealedBlock + }, + nil, + ).Maybe() + suite.finalSnapshot.On("Head").Return( + func() *flow.Header { + return suite.finalizedBlock + }, + nil, + ).Maybe() - suite.rootBlock = unittest.BlockHeaderFixture(unittest.WithHeaderHeight(0)) suite.params = new(protocol.Params) suite.params.On("Root").Return(suite.rootBlock, nil) suite.params.On("SporkRootBlockHeight").Return(suite.rootBlock.Height, nil) @@ -107,6 +131,20 @@ func (suite *Suite) SetupTest() { suite.chainID = flow.Testnet suite.metrics = metrics.NewNoopCollector() + + suite.finalizationDistributor = pubsub.NewFinalizationDistributor() + + var err error + suite.finalizedHeaderCache, err = synceng.NewFinalizedHeaderCache(suite.log, suite.state, suite.finalizationDistributor) + require.NoError(suite.T(), err) + + unittest.RequireCloseBefore(suite.T(), suite.finalizedHeaderCache.Ready(), time.Second, "expect to start before timeout") +} + +func (suite *Suite) TearDownTest() { + if suite.finalizedHeaderCache != nil { + unittest.RequireCloseBefore(suite.T(), suite.finalizedHeaderCache.Done(), time.Second, "expect to stop before timeout") + } } func (suite *Suite) RunTest( @@ -135,8 +173,7 @@ func (suite *Suite) RunTest( backend.DefaultSnapshotHistoryLimit, nil, ) - - handler := access.NewHandler(suite.backend, suite.chainID.Chain(), access.WithBlockSignerDecoder(suite.signerIndicesDecoder)) + handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me, access.WithBlockSignerDecoder(suite.signerIndicesDecoder)) f(handler, db, all) }) } @@ -158,7 +195,7 @@ func (suite *Suite) TestSendAndGetTransaction() { Return(referenceBlock, nil). Twice() - suite.snapshot. + suite.finalSnapshot. On("Head"). Return(referenceBlock, nil). Once() @@ -196,15 +233,14 @@ func (suite *Suite) TestSendAndGetTransaction() { func (suite *Suite) TestSendExpiredTransaction() { suite.RunTest(func(handler *access.Handler, _ *badger.DB, _ *storage.All) { - referenceBlock := unittest.BlockHeaderFixture() + referenceBlock := suite.finalizedBlock + transaction := unittest.TransactionFixture() + transaction.SetReferenceBlockID(referenceBlock.ID()) // create latest block that is past the expiry window latestBlock := unittest.BlockHeaderFixture() latestBlock.Height = referenceBlock.Height + flow.DefaultTransactionExpiry*2 - transaction := unittest.TransactionFixture() - transaction.SetReferenceBlockID(referenceBlock.ID()) - refSnapshot := new(protocol.Snapshot) suite.state. @@ -216,10 +252,8 @@ func (suite *Suite) TestSendExpiredTransaction() { Return(referenceBlock, nil). Twice() - suite.snapshot. - On("Head"). - Return(latestBlock, nil). - Once() + //Advancing final state to expire ref block + suite.finalizedBlock = latestBlock req := &accessproto.SendTransactionRequest{ Transaction: convert.TransactionToMessage(transaction.TransactionBody), @@ -244,9 +278,9 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() { transaction := unittest.TransactionFixture() transaction.SetReferenceBlockID(referenceBlock.ID()) - // setup the state and snapshot mock expectations - suite.state.On("AtBlockID", referenceBlock.ID()).Return(suite.snapshot, nil) - suite.snapshot.On("Head").Return(referenceBlock, nil) + // setup the state and finalSnapshot mock expectations + suite.state.On("AtBlockID", referenceBlock.ID()).Return(suite.finalSnapshot, nil) + suite.finalSnapshot.On("Head").Return(referenceBlock, nil) // create storage metrics := metrics.NewNoopCollector() @@ -312,7 +346,7 @@ func (suite *Suite) TestSendTransactionToRandomCollectionNode() { nil, ) - handler := access.NewHandler(backend, suite.chainID.Chain()) + handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) // Send transaction 1 resp, err := handler.SendTransaction(context.Background(), sendReq1) @@ -408,7 +442,7 @@ func (suite *Suite) TestGetBlockByIDAndHeight() { require.Equal(suite.T(), expectedMessage, actual) } - suite.snapshot.On("Head").Return(block1.Header, nil) + suite.finalSnapshot.On("Head").Return(block1.Header, nil) suite.Run("get header 1 by ID", func() { // get header by ID id := block1.ID() @@ -575,7 +609,7 @@ func (suite *Suite) TestGetSealedTransaction() { results := bstorage.NewExecutionResults(suite.metrics, db) receipts := bstorage.NewExecutionReceipts(suite.metrics, db, results, bstorage.DefaultCacheSize) enIdentities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) - enNodeIDs := flow.IdentifierList(enIdentities.NodeIDs()) + enNodeIDs := enIdentities.NodeIDs() // create block -> collection -> transactions block, collection := suite.createChain() @@ -587,19 +621,17 @@ func (suite *Suite) TestGetSealedTransaction() { Once() suite.request.On("Request", mock.Anything, mock.Anything).Return() - suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() - colIdentities := unittest.IdentityListFixture(1, unittest.WithRole(flow.RoleCollection)) allIdentities := append(colIdentities, enIdentities...) - suite.snapshot.On("Identities", mock.Anything).Return(allIdentities, nil).Once() + suite.finalSnapshot.On("Identities", mock.Anything).Return(allIdentities, nil).Once() exeEventResp := execproto.GetTransactionResultResponse{ Events: nil, } // generate receipts - executionReceipts := unittest.ReceiptsForBlockFixture(&block, enNodeIDs) + executionReceipts := unittest.ReceiptsForBlockFixture(block, enNodeIDs) // assume execution node returns an empty list of events suite.execClient.On("GetTransactionResult", mock.Anything, mock.Anything).Return(&exeEventResp, nil) @@ -640,12 +672,12 @@ func (suite *Suite) TestGetSealedTransaction() { nil, ) - handler := access.NewHandler(backend, suite.chainID.Chain()) + handler := access.NewHandler(backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, rpc.Config{}, nil, nil, all.Blocks, all.Headers, collections, transactions, receipts, - results, suite.chainID, metrics, metrics, 0, 0, false, false, nil, nil) + results, suite.chainID, metrics, metrics, 0, 0, false, false, nil, nil, suite.me) require.NoError(suite.T(), err) - rpcEng, err := rpcEngBuilder.WithLegacy().Build() + rpcEng, err := rpcEngBuilder.WithFinalizedHeaderCache(suite.finalizedHeaderCache).WithLegacy().Build() require.NoError(suite.T(), err) // create the ingest engine @@ -654,9 +686,9 @@ func (suite *Suite) TestGetSealedTransaction() { require.NoError(suite.T(), err) // 1. Assume that follower engine updated the block storage and the protocol state. The block is reported as sealed - err = all.Blocks.Store(&block) + err = all.Blocks.Store(block) require.NoError(suite.T(), err) - suite.snapshot.On("Head").Return(block.Header, nil).Twice() + suite.sealedBlock = block.Header background, cancel := context.WithCancel(context.Background()) defer cancel() @@ -674,9 +706,8 @@ func (suite *Suite) TestGetSealedTransaction() { // 3. Request engine is used to request missing collection suite.request.On("EntityByID", collection.ID(), mock.Anything).Return() - // 4. Ingest engine receives the requested collection and all the execution receipts - ingestEng.OnCollection(originID, &collection) + ingestEng.OnCollection(originID, collection) for _, r := range executionReceipts { err = ingestEng.Process(channels.ReceiveReceipts, enNodeIDs[0], r) @@ -707,7 +738,8 @@ func (suite *Suite) TestExecuteScript() { receipts := bstorage.NewExecutionReceipts(suite.metrics, db, results, bstorage.DefaultCacheSize) identities := unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution)) - suite.snapshot.On("Identities", mock.Anything).Return(identities, nil) + suite.sealedSnapshot.On("Identities", mock.Anything).Return(identities, nil) + suite.finalSnapshot.On("Identities", mock.Anything).Return(identities, nil) // create a mock connection factory connFactory := new(factorymock.ConnectionFactory) @@ -734,7 +766,7 @@ func (suite *Suite) TestExecuteScript() { nil, ) - handler := access.NewHandler(suite.backend, suite.chainID.Chain()) + handler := access.NewHandler(suite.backend, suite.chainID.Chain(), suite.finalizedHeaderCache, suite.me) // initialize metrics related storage metrics := metrics.NewNoopCollector() @@ -753,33 +785,32 @@ func (suite *Suite) TestExecuteScript() { transactions, results, receipts, metrics, collectionsToMarkFinalized, collectionsToMarkExecuted, blocksToMarkExecuted, nil) require.NoError(suite.T(), err) + // create another block as a predecessor of the block created earlier + prevBlock := unittest.BlockWithParentFixture(suite.finalizedBlock) + // create a block and a seal pointing to that block - lastBlock := unittest.BlockFixture() - lastBlock.Header.Height = 2 - err = all.Blocks.Store(&lastBlock) + lastBlock := unittest.BlockWithParentFixture(prevBlock.Header) + err = all.Blocks.Store(lastBlock) require.NoError(suite.T(), err) err = db.Update(operation.IndexBlockHeight(lastBlock.Header.Height, lastBlock.ID())) require.NoError(suite.T(), err) - suite.snapshot.On("Head").Return(lastBlock.Header, nil).Once() - + //update latest sealed block + suite.sealedBlock = lastBlock.Header // create execution receipts for each of the execution node and the last block - executionReceipts := unittest.ReceiptsForBlockFixture(&lastBlock, identities.NodeIDs()) + executionReceipts := unittest.ReceiptsForBlockFixture(lastBlock, identities.NodeIDs()) // notify the ingest engine about the receipts for _, r := range executionReceipts { err = ingestEng.ProcessLocal(r) require.NoError(suite.T(), err) } - // create another block as a predecessor of the block created earlier - prevBlock := unittest.BlockFixture() - prevBlock.Header.Height = lastBlock.Header.Height - 1 - err = all.Blocks.Store(&prevBlock) + err = all.Blocks.Store(prevBlock) require.NoError(suite.T(), err) err = db.Update(operation.IndexBlockHeight(prevBlock.Header.Height, prevBlock.ID())) require.NoError(suite.T(), err) // create execution receipts for each of the execution node and the previous block - executionReceipts = unittest.ReceiptsForBlockFixture(&prevBlock, identities.NodeIDs()) + executionReceipts = unittest.ReceiptsForBlockFixture(prevBlock, identities.NodeIDs()) // notify the ingest engine about the receipts for _, r := range executionReceipts { err = ingestEng.ProcessLocal(r) @@ -803,8 +834,17 @@ func (suite *Suite) TestExecuteScript() { suite.execClient.On("ExecuteScriptAtBlockID", ctx, &executionReq).Return(&executionResp, nil).Once() + finalizedHeader := suite.finalizedHeaderCache.Get() + finalizedHeaderId := finalizedHeader.ID() + nodeId := suite.me.NodeID() + expectedResp := accessproto.ExecuteScriptResponse{ Value: executionResp.GetValue(), + Metadata: &entitiesproto.Metadata{ + LatestFinalizedBlockId: finalizedHeaderId[:], + LatestFinalizedHeight: finalizedHeader.Height, + NodeId: nodeId[:], + }, } return &expectedResp } @@ -816,10 +856,9 @@ func (suite *Suite) TestExecuteScript() { } suite.Run("execute script at latest block", func() { - suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() suite.state. On("AtBlockID", lastBlock.ID()). - Return(suite.snapshot, nil) + Return(suite.sealedSnapshot, nil) expectedResp := setupExecClientMock(lastBlock.ID()) req := accessproto.ExecuteScriptAtLatestBlockRequest{ @@ -832,7 +871,7 @@ func (suite *Suite) TestExecuteScript() { suite.Run("execute script at block id", func() { suite.state. On("AtBlockID", prevBlock.ID()). - Return(suite.snapshot, nil) + Return(suite.sealedSnapshot, nil) expectedResp := setupExecClientMock(prevBlock.ID()) id := prevBlock.ID() @@ -847,7 +886,7 @@ func (suite *Suite) TestExecuteScript() { suite.Run("execute script at block height", func() { suite.state. On("AtBlockID", prevBlock.ID()). - Return(suite.snapshot, nil) + Return(suite.sealedSnapshot, nil) expectedResp := setupExecClientMock(prevBlock.ID()) req := accessproto.ExecuteScriptAtBlockHeightRequest{ @@ -860,7 +899,79 @@ func (suite *Suite) TestExecuteScript() { }) } -func (suite *Suite) createChain() (flow.Block, flow.Collection) { +// TestRpcEngineBuilderWithFinalizedHeaderCache test checks whether the RPC builder can construct the engine correctly +// only when the WithFinalizedHeaderCache method has been called. +func (suite *Suite) TestRpcEngineBuilderWithFinalizedHeaderCache() { + unittest.RunWithBadgerDB(suite.T(), func(db *badger.DB) { + all := util.StorageLayer(suite.T(), db) + results := bstorage.NewExecutionResults(suite.metrics, db) + receipts := bstorage.NewExecutionReceipts(suite.metrics, db, results, bstorage.DefaultCacheSize) + + // initialize storage + metrics := metrics.NewNoopCollector() + transactions := bstorage.NewTransactions(metrics, db) + collections := bstorage.NewCollections(db, transactions) + + rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, rpc.Config{}, nil, nil, all.Blocks, all.Headers, collections, transactions, receipts, + results, suite.chainID, metrics, metrics, 0, 0, false, false, nil, nil, suite.me) + require.NoError(suite.T(), err) + + rpcEng, err := rpcEngBuilder.WithLegacy().WithBlockSignerDecoder(suite.signerIndicesDecoder).Build() + require.Error(suite.T(), err) + require.Nil(suite.T(), rpcEng) + + rpcEng, err = rpcEngBuilder.WithFinalizedHeaderCache(suite.finalizedHeaderCache).Build() + require.NoError(suite.T(), err) + require.NotNil(suite.T(), rpcEng) + }) +} + +// TestLastFinalizedBlockHeightResult test checks whether the response from a GetBlockHeaderByID request contains +// the finalized block height and ID even when the finalized block height has been changed. +func (suite *Suite) TestLastFinalizedBlockHeightResult() { + suite.RunTest(func(handler *access.Handler, db *badger.DB, all *storage.All) { + block := unittest.BlockWithParentFixture(suite.finalizedBlock) + newFinalizedBlock := unittest.BlockWithParentFixture(block.Header) + + // store new block + require.NoError(suite.T(), all.Blocks.Store(block)) + + assertFinalizedBlockHeader := func(resp *accessproto.BlockHeaderResponse, err error) { + require.NoError(suite.T(), err) + require.NotNil(suite.T(), resp) + + finalizedHeaderId := suite.finalizedBlock.ID() + nodeId := suite.me.NodeID() + + require.Equal(suite.T(), &entitiesproto.Metadata{ + LatestFinalizedBlockId: finalizedHeaderId[:], + LatestFinalizedHeight: suite.finalizedBlock.Height, + NodeId: nodeId[:], + }, resp.Metadata) + } + + id := block.ID() + req := &accessproto.GetBlockHeaderByIDRequest{ + Id: id[:], + } + + resp, err := handler.GetBlockHeaderByID(context.Background(), req) + assertFinalizedBlockHeader(resp, err) + + suite.finalizedBlock = newFinalizedBlock.Header + // report new finalized block to finalized blocks cache + suite.finalizationDistributor.OnFinalizedBlock(model.BlockFromFlow(suite.finalizedBlock)) + time.Sleep(time.Millisecond * 100) // give enough time to process async event + + resp, err = handler.GetBlockHeaderByID(context.Background(), req) + assertFinalizedBlockHeader(resp, err) + }) +} + +// TestLastFinalizedBlockHeightResult tests on example of the GetBlockHeaderByID function that the LastFinalizedBlock +// field in the response matches the finalized header from cache. It also tests that the LastFinalizedBlock field is +// updated correctly when a block with a greater height is finalized. +func (suite *Suite) createChain() (*flow.Block, *flow.Collection) { collection := unittest.CollectionFixture(10) refBlockID := unittest.IdentifierFixture() // prepare cluster committee members @@ -875,9 +986,8 @@ func (suite *Suite) createChain() (flow.Block, flow.Collection) { ReferenceBlockID: refBlockID, SignerIndices: indices, } - block := unittest.BlockFixture() - block.Payload.Guarantees = []*flow.CollectionGuarantee{guarantee} - block.Header.PayloadHash = block.Payload.Hash() + block := unittest.BlockWithParentFixture(suite.finalizedBlock) + block.SetPayload(unittest.PayloadFixture(unittest.WithGuarantees(guarantee))) cluster := new(protocol.Cluster) cluster.On("Members").Return(clusterCommittee, nil) @@ -885,13 +995,12 @@ func (suite *Suite) createChain() (flow.Block, flow.Collection) { epoch.On("ClusterByChainID", mock.Anything).Return(cluster, nil) epochs := new(protocol.EpochQuery) epochs.On("Current").Return(epoch) - snap := protocol.NewSnapshot(suite.T()) + snap := new(protocol.Snapshot) snap.On("Epochs").Return(epochs).Maybe() snap.On("Params").Return(suite.params).Maybe() snap.On("Head").Return(block.Header, nil).Maybe() - suite.state.On("AtBlockID", mock.Anything).Return(snap).Once() // initial height lookup in ingestion engine suite.state.On("AtBlockID", refBlockID).Return(snap) - return block, collection + return block, &collection } diff --git a/engine/access/ingestion/engine_test.go b/engine/access/ingestion/engine_test.go index 2f3afe79fd2..db32e51b0ad 100644 --- a/engine/access/ingestion/engine_test.go +++ b/engine/access/ingestion/engine_test.go @@ -9,6 +9,9 @@ import ( "testing" "time" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" + synceng "github.com/onflow/flow-go/engine/common/synchronization" + "github.com/rs/zerolog" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -43,17 +46,19 @@ type Suite struct { params *protocol.Params } - me *module.Local - request *module.Requester - provider *mocknetwork.Engine - blocks *storage.Blocks - headers *storage.Headers - collections *storage.Collections - transactions *storage.Transactions - receipts *storage.ExecutionReceipts - results *storage.ExecutionResults - seals *storage.Seals - downloader *downloadermock.Downloader + me *module.Local + request *module.Requester + provider *mocknetwork.Engine + blocks *storage.Blocks + headers *storage.Headers + collections *storage.Collections + transactions *storage.Transactions + receipts *storage.ExecutionReceipts + results *storage.ExecutionResults + seals *storage.Seals + downloader *downloadermock.Downloader + sealedBlock *flow.Header + finalizedBlock *flow.Header eng *Engine cancel context.CancelFunc @@ -76,9 +81,16 @@ func (suite *Suite) SetupTest() { suite.proto.state = new(protocol.FollowerState) suite.proto.snapshot = new(protocol.Snapshot) suite.proto.params = new(protocol.Params) + suite.finalizedBlock = unittest.BlockHeaderFixture(unittest.WithHeaderHeight(0)) suite.proto.state.On("Identity").Return(obsIdentity, nil) suite.proto.state.On("Final").Return(suite.proto.snapshot, nil) suite.proto.state.On("Params").Return(suite.proto.params) + suite.proto.snapshot.On("Head").Return( + func() *flow.Header { + return suite.finalizedBlock + }, + nil, + ).Maybe() suite.me = new(module.Local) suite.me.On("NodeID").Return(obsIdentity.NodeID) @@ -104,11 +116,16 @@ func (suite *Suite) SetupTest() { blocksToMarkExecuted, err := stdmap.NewTimes(100) require.NoError(suite.T(), err) + finalizationDistributor := pubsub.NewFinalizationDistributor() + + finalizedHeaderCache, err := synceng.NewFinalizedHeaderCache(log, suite.proto.state, finalizationDistributor) + require.NoError(suite.T(), err) + rpcEngBuilder, err := rpc.NewBuilder(log, suite.proto.state, rpc.Config{}, nil, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, suite.receipts, suite.results, flow.Testnet, metrics.NewNoopCollector(), metrics.NewNoopCollector(), 0, - 0, false, false, nil, nil) + 0, false, false, nil, nil, suite.me) require.NoError(suite.T(), err) - rpcEng, err := rpcEngBuilder.WithLegacy().Build() + rpcEng, err := rpcEngBuilder.WithLegacy().WithFinalizedHeaderCache(finalizedHeaderCache).Build() require.NoError(suite.T(), err) eng, err := New(log, net, suite.proto.state, suite.me, suite.request, suite.blocks, suite.headers, suite.collections, @@ -369,7 +386,7 @@ func (suite *Suite) TestRequestMissingCollections() { // consider collections are missing for all blocks suite.blocks.On("GetLastFullBlockHeight").Return(startHeight-1, nil) // consider the last test block as the head - suite.proto.snapshot.On("Head").Return(blocks[blkCnt-1].Header, nil) + suite.finalizedBlock = blocks[blkCnt-1].Header // p is the probability of not receiving the collection before the next poll and it // helps simulate the slow trickle of the requested collections being received @@ -556,7 +573,7 @@ func (suite *Suite) TestUpdateLastFullBlockReceivedIndex() { }) // consider the last test block as the head - suite.proto.snapshot.On("Head").Return(finalizedBlk.Header, nil) + suite.finalizedBlock = finalizedBlk.Header suite.Run("full block height index is created and advanced if not present", func() { // simulate the absence of the full block height index diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 69bde45c23b..34e0fa584f8 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -4,6 +4,9 @@ import ( "context" "fmt" + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" + synceng "github.com/onflow/flow-go/engine/common/synchronization" + "math/rand" "net/http" "os" @@ -50,6 +53,8 @@ type RestAPITestSuite struct { chainID flow.ChainID metrics *metrics.NoopCollector rpcEng *rpc.Engine + sealedBlock *flow.Header + finalizedBlock *flow.Header // storage blocks *storagemock.Blocks @@ -66,9 +71,23 @@ func (suite *RestAPITestSuite) SetupTest() { suite.state = new(protocol.State) suite.sealedSnaphost = new(protocol.Snapshot) suite.finalizedSnapshot = new(protocol.Snapshot) + suite.sealedBlock = unittest.BlockHeaderFixture(unittest.WithHeaderHeight(0)) + suite.finalizedBlock = unittest.BlockHeaderWithParentFixture(suite.sealedBlock) suite.state.On("Sealed").Return(suite.sealedSnaphost, nil) suite.state.On("Final").Return(suite.finalizedSnapshot, nil) + suite.sealedSnaphost.On("Head").Return( + func() *flow.Header { + return suite.sealedBlock + }, + nil, + ).Maybe() + suite.finalizedSnapshot.On("Head").Return( + func() *flow.Header { + return suite.finalizedBlock + }, + nil, + ).Maybe() suite.blocks = new(storagemock.Blocks) suite.headers = new(storagemock.Headers) suite.transactions = new(storagemock.Transactions) @@ -99,11 +118,17 @@ func (suite *RestAPITestSuite) SetupTest() { RESTListenAddr: unittest.DefaultAddress, } + finalizationDistributor := pubsub.NewFinalizationDistributor() + + var err error + finalizedHeaderCache, err := synceng.NewFinalizedHeaderCache(suite.log, suite.state, finalizationDistributor) + require.NoError(suite.T(), err) + rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, config, suite.collClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, nil, suite.executionResults, suite.chainID, suite.metrics, suite.metrics, 0, 0, false, - false, nil, nil) + false, nil, nil, suite.me) assert.NoError(suite.T(), err) - suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() + suite.rpcEng, err = rpcEngBuilder.WithLegacy().WithFinalizedHeaderCache(finalizedHeaderCache).Build() assert.NoError(suite.T(), err) unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) @@ -136,10 +161,8 @@ func (suite *RestAPITestSuite) TestGetBlock() { suite.executionResults.On("ByBlockID", block.ID()).Return(execResult, nil) } - sealedBlock := testBlocks[len(testBlocks)-1] - finalizedBlock := testBlocks[len(testBlocks)-2] - suite.sealedSnaphost.On("Head").Return(sealedBlock.Header, nil) - suite.finalizedSnapshot.On("Head").Return(finalizedBlock.Header, nil) + suite.sealedBlock = testBlocks[len(testBlocks)-1].Header + suite.finalizedBlock = testBlocks[len(testBlocks)-2].Header client := suite.restAPIClient() @@ -227,7 +250,7 @@ func (suite *RestAPITestSuite) TestGetBlock() { require.NoError(suite.T(), err) assert.Equal(suite.T(), http.StatusOK, resp.StatusCode) assert.Len(suite.T(), actualBlocks, 1) - assert.Equal(suite.T(), finalizedBlock.ID().String(), actualBlocks[0].Header.Id) + assert.Equal(suite.T(), suite.finalizedBlock.ID().String(), actualBlocks[0].Header.Id) }) suite.Run("GetBlockByHeight for height=sealed happy path", func() { @@ -239,7 +262,7 @@ func (suite *RestAPITestSuite) TestGetBlock() { require.NoError(suite.T(), err) assert.Equal(suite.T(), http.StatusOK, resp.StatusCode) assert.Len(suite.T(), actualBlocks, 1) - assert.Equal(suite.T(), sealedBlock.ID().String(), actualBlocks[0].Header.Id) + assert.Equal(suite.T(), suite.sealedBlock.ID().String(), actualBlocks[0].Header.Id) }) suite.Run("GetBlockByID with a non-existing block ID", func() { diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index 360e9f81ba2..8342669fca3 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -88,6 +88,7 @@ func NewBuilder(log zerolog.Logger, rpcMetricsEnabled bool, apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the Access API e.g. Ping->100, GetTransaction->300 apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the Access API e.g. Ping->50, GetTransaction->10 + me module.Local, ) (*RPCEngineBuilder, error) { log = log.With().Str("engine", "rpc").Logger() @@ -196,7 +197,7 @@ func NewBuilder(log zerolog.Logger, chain: chainID.Chain(), } - builder := NewRPCEngineBuilder(eng) + builder := NewRPCEngineBuilder(eng, me) if rpcMetricsEnabled { builder.WithMetrics() } diff --git a/engine/access/rpc/engine_builder.go b/engine/access/rpc/engine_builder.go index 97fa875cef9..9f843c2b8cc 100644 --- a/engine/access/rpc/engine_builder.go +++ b/engine/access/rpc/engine_builder.go @@ -4,12 +4,15 @@ import ( "fmt" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access" "github.com/onflow/flow-go/access" legacyaccess "github.com/onflow/flow-go/access/legacy" "github.com/onflow/flow-go/consensus/hotstuff" + synceng "github.com/onflow/flow-go/engine/common/synchronization" + "github.com/onflow/flow-go/module" ) type RPCEngineBuilder struct { @@ -18,13 +21,16 @@ type RPCEngineBuilder struct { // optional parameters, only one can be set during build phase signerIndicesDecoder hotstuff.BlockSignerDecoder handler accessproto.AccessAPIServer // Use the parent interface instead of implementation, so that we can assign it to proxy. + finalizedHeaderCache *synceng.FinalizedHeaderCache + me module.Local } // NewRPCEngineBuilder helps to build a new RPC engine. -func NewRPCEngineBuilder(engine *Engine) *RPCEngineBuilder { +func NewRPCEngineBuilder(engine *Engine, me module.Local) *RPCEngineBuilder { // the default handler will use the engine.backend implementation return &RPCEngineBuilder{ Engine: engine, + me: me, } } @@ -57,6 +63,19 @@ func (builder *RPCEngineBuilder) WithNewHandler(handler accessproto.AccessAPISer return builder } +// WithFinalizedHeaderCache method specifies that the newly created `AccessAPIServer` should use +// the given `FinalizedHeaderCache` to retrieve information about the finalized block that will be included +// in the server's responses. +// Caution: +// When injecting `BlockSignerDecoder` (via the WithBlockSignerDecoder method), you must also inject +// the `FinalizedHeaderCache` or the builder will error during the build step. +// +// The method returns a self-reference for chaining. +func (builder *RPCEngineBuilder) WithFinalizedHeaderCache(cache *synceng.FinalizedHeaderCache) *RPCEngineBuilder { + builder.finalizedHeaderCache = cache + return builder +} + // WithLegacy specifies that a legacy access API should be instantiated // Returns self-reference for chaining. func (builder *RPCEngineBuilder) WithLegacy() *RPCEngineBuilder { @@ -88,10 +107,13 @@ func (builder *RPCEngineBuilder) Build() (*Engine, error) { } handler := builder.handler if handler == nil { + if builder.finalizedHeaderCache == nil { + return nil, fmt.Errorf("FinalizedHeaderCache (via method `WithFinalizedHeaderCache`) has to be specified") + } if builder.signerIndicesDecoder == nil { - handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain) + handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me) } else { - handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, access.WithBlockSignerDecoder(builder.signerIndicesDecoder)) + handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me, access.WithBlockSignerDecoder(builder.signerIndicesDecoder)) } } accessproto.RegisterAccessAPIServer(builder.unsecureGrpcServer, handler) diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 59f292cf80c..0c7c1500b6f 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -8,6 +8,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" + synceng "github.com/onflow/flow-go/engine/common/synchronization" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -109,10 +114,19 @@ func (suite *RateLimitTestSuite) SetupTest() { "Ping": suite.rateLimit, } + block := unittest.BlockHeaderFixture() + suite.snapshot.On("Head").Return(block, nil) + + finalizationDistributor := pubsub.NewFinalizationDistributor() + + var err error + finalizedHeaderCache, err := synceng.NewFinalizedHeaderCache(suite.log, suite.state, finalizationDistributor) + require.NoError(suite.T(), err) + rpcEngBuilder, err := NewBuilder(suite.log, suite.state, config, suite.collClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, nil, - nil, suite.chainID, suite.metrics, suite.metrics, 0, 0, false, false, apiRateLimt, apiBurstLimt) + nil, suite.chainID, suite.metrics, suite.metrics, 0, 0, false, false, apiRateLimt, apiBurstLimt, suite.me) assert.NoError(suite.T(), err) - suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() + suite.rpcEng, err = rpcEngBuilder.WithLegacy().WithFinalizedHeaderCache(finalizedHeaderCache).Build() assert.NoError(suite.T(), err) unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index 66933a15dc7..056702d527c 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -7,6 +7,11 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub" + synceng "github.com/onflow/flow-go/engine/common/synchronization" + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -101,10 +106,18 @@ func (suite *SecureGRPCTestSuite) SetupTest() { // save the public key to use later in tests later suite.publicKey = networkingKey.PublicKey() + block := unittest.BlockHeaderFixture() + suite.snapshot.On("Head").Return(block, nil) + + finalizationDistributor := pubsub.NewFinalizationDistributor() + + finalizedHeaderCache, err := synceng.NewFinalizedHeaderCache(suite.log, suite.state, finalizationDistributor) + require.NoError(suite.T(), err) + rpcEngBuilder, err := rpc.NewBuilder(suite.log, suite.state, config, suite.collClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, nil, - nil, suite.chainID, suite.metrics, suite.metrics, 0, 0, false, false, nil, nil) + nil, suite.chainID, suite.metrics, suite.metrics, 0, 0, false, false, nil, nil, suite.me) assert.NoError(suite.T(), err) - suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() + suite.rpcEng, err = rpcEngBuilder.WithLegacy().WithFinalizedHeaderCache(finalizedHeaderCache).Build() assert.NoError(suite.T(), err) unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) diff --git a/go.mod b/go.mod index 21a9faa6018..9bb96532dad 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 github.com/onflow/flow-go-sdk v0.40.0 github.com/onflow/flow-go/crypto v0.24.7 - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible diff --git a/go.sum b/go.sum index 79d22d8b924..1264a78f6c3 100644 --- a/go.sum +++ b/go.sum @@ -1238,8 +1238,8 @@ github.com/onflow/flow-go-sdk v0.40.0 h1:s8uwoyTquN8tjdXpqGmNkXTjf79yUII8JExc5QE github.com/onflow/flow-go-sdk v0.40.0/go.mod h1:34dxXk9Hp/bQw6Zy6+H44Xo0kQU+aJyQoqdDxq00rJM= github.com/onflow/flow-go/crypto v0.24.7 h1:RCLuB83At4z5wkAyUCF7MYEnPoIIOHghJaODuJyEoW0= github.com/onflow/flow-go/crypto v0.24.7/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7Q6u+bCI78lfNX0= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d h1:Wl8bE1YeZEcRNnCpxw2rikOEaivuYKDrnJd2vsfIWoA= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 h1:QxQxCgce0tvAn/ibnEVYcUFRpy9QLxdfLRavKWYptvU= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8 h1:XcSR/n2aSVO7lOEsKScYALcpHlfowLwicZ9yVbL6bnA= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8/go.mod h1:73C8FlT4L/Qe4Cf5iXUNL8b2pvu4zs5dJMMJ5V2TjUI= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/insecure/go.mod b/insecure/go.mod index 1c74525425e..00c415ad3cd 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -186,7 +186,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 // indirect github.com/onflow/flow-ft/lib/go/contracts v0.7.0 // indirect github.com/onflow/flow-go-sdk v0.40.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d // indirect + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 // indirect github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8 // indirect github.com/onflow/sdks v0.5.0 // indirect github.com/onsi/ginkgo/v2 v2.6.1 // indirect diff --git a/insecure/go.sum b/insecure/go.sum index 598f99e4cdb..157fe50a04a 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1186,8 +1186,8 @@ github.com/onflow/flow-go-sdk v0.40.0 h1:s8uwoyTquN8tjdXpqGmNkXTjf79yUII8JExc5QE github.com/onflow/flow-go-sdk v0.40.0/go.mod h1:34dxXk9Hp/bQw6Zy6+H44Xo0kQU+aJyQoqdDxq00rJM= github.com/onflow/flow-go/crypto v0.24.7 h1:RCLuB83At4z5wkAyUCF7MYEnPoIIOHghJaODuJyEoW0= github.com/onflow/flow-go/crypto v0.24.7/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7Q6u+bCI78lfNX0= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d h1:Wl8bE1YeZEcRNnCpxw2rikOEaivuYKDrnJd2vsfIWoA= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230330183547-d0dd18f6f20d/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 h1:QxQxCgce0tvAn/ibnEVYcUFRpy9QLxdfLRavKWYptvU= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8 h1:XcSR/n2aSVO7lOEsKScYALcpHlfowLwicZ9yVbL6bnA= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8/go.mod h1:73C8FlT4L/Qe4Cf5iXUNL8b2pvu4zs5dJMMJ5V2TjUI= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/integration/go.mod b/integration/go.mod index b1ae92ab43b..1eaa7d3948c 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -24,7 +24,7 @@ require ( github.com/onflow/flow-go-sdk v0.40.0 github.com/onflow/flow-go/crypto v0.24.7 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230407005012-727d541fd5f8 + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 github.com/plus3it/gorecurcopy v0.0.1 github.com/prometheus/client_golang v1.14.0 github.com/rs/zerolog v1.29.0 diff --git a/integration/go.sum b/integration/go.sum index 35c6fbd3bef..5f99e1d31bf 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1318,8 +1318,8 @@ github.com/onflow/flow-go-sdk v0.40.0 h1:s8uwoyTquN8tjdXpqGmNkXTjf79yUII8JExc5QE github.com/onflow/flow-go-sdk v0.40.0/go.mod h1:34dxXk9Hp/bQw6Zy6+H44Xo0kQU+aJyQoqdDxq00rJM= github.com/onflow/flow-go/crypto v0.24.7 h1:RCLuB83At4z5wkAyUCF7MYEnPoIIOHghJaODuJyEoW0= github.com/onflow/flow-go/crypto v0.24.7/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7Q6u+bCI78lfNX0= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230407005012-727d541fd5f8 h1:O8uM6GVVMhRwBtYaGl93+tDSu6vWqUc47b12fPkZGXk= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230407005012-727d541fd5f8/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1 h1:QxQxCgce0tvAn/ibnEVYcUFRpy9QLxdfLRavKWYptvU= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230424214110-4f04b71ea3e1/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8 h1:XcSR/n2aSVO7lOEsKScYALcpHlfowLwicZ9yVbL6bnA= github.com/onflow/go-bitswap v0.0.0-20221017184039-808c5791a8a8/go.mod h1:73C8FlT4L/Qe4Cf5iXUNL8b2pvu4zs5dJMMJ5V2TjUI= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/network/p2p/tracer/gossipSubScoreTracer.go b/network/p2p/tracer/gossipSubScoreTracer.go index aae023099d7..facdc8bd182 100644 --- a/network/p2p/tracer/gossipSubScoreTracer.go +++ b/network/p2p/tracer/gossipSubScoreTracer.go @@ -224,7 +224,7 @@ func (g *GossipSubScoreTracer) logPeerScore(peerID peer.ID) bool { Str("role", identity.Role.String()).Logger() } - lg = g.logger.With(). + lg = lg.With(). Str("peer_id", peerID.String()). Float64("overall_score", snapshot.Score). Float64("app_specific_score", snapshot.AppSpecificScore).