Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Browse files Browse the repository at this point in the history
…eroua/3129-access-add-circuit-breaker
  • Loading branch information
Guitarheroua committed Jul 21, 2023
2 parents 0d5a2c4 + b53dd43 commit ec9fb75
Show file tree
Hide file tree
Showing 61 changed files with 2,267 additions and 750 deletions.
59 changes: 26 additions & 33 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package access
import (
"context"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
)

type Handler struct {
Expand Down Expand Up @@ -516,7 +516,7 @@ func (h *Handler) GetEventsForHeightRange(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -548,7 +548,7 @@ func (h *Handler) GetEventsForBlockIDs(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,6 +590,27 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.
return executionResultToMessages(result, metadata)
}

// GetExecutionResultByID returns the execution result for the given ID.
func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) {
metadata := h.buildMetadataResponse()

blockID := convert.MessageToIdentifier(req.GetId())

result, err := h.api.GetExecutionResultByID(ctx, blockID)
if err != nil {
return nil, err
}

execResult, err := convert.ExecutionResultToMessage(result)
if err != nil {
return nil, err
}
return &access.ExecutionResultByIDResponse{
ExecutionResult: execResult,
Metadata: metadata,
}, nil
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
metadata := h.buildMetadataResponse()

Expand Down Expand Up @@ -659,34 +680,6 @@ func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Meta
}, nil
}

func blockEventsToMessages(blocks []flow.BlockEvents) ([]*access.EventsResponse_Result, error) {
results := make([]*access.EventsResponse_Result, len(blocks))

for i, block := range blocks {
event, err := blockEventsToMessage(block)
if err != nil {
return nil, err
}
results[i] = event
}

return results, nil
}

func blockEventsToMessage(block flow.BlockEvents) (*access.EventsResponse_Result, error) {
eventMessages := make([]*entities.Event, len(block.Events))
for i, event := range block.Events {
eventMessages[i] = convert.EventToMessage(event)
}
timestamp := timestamppb.New(block.BlockTimestamp)
return &access.EventsResponse_Result{
BlockId: block.BlockID[:],
BlockHeight: block.BlockHeight,
BlockTimestamp: timestamp,
Events: eventMessages,
}, nil
}

// WithBlockSignerDecoder configures the Handler to decode signer indices
// via the provided hotstuff.BlockSignerDecoder
func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) func(*Handler) {
Expand Down
181 changes: 135 additions & 46 deletions cmd/access/node_builder/access_node_builder.go

Large diffs are not rendered by default.

156 changes: 130 additions & 26 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine/access/apiproxy"
restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy"
"github.com/onflow/flow-go/engine/access/rest/routes"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/common/follower"
Expand All @@ -39,6 +41,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/chainsync"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -109,19 +112,22 @@ type ObserverServiceConfig struct {
func DefaultObserverServiceConfig() *ObserverServiceConfig {
return &ObserverServiceConfig{
rpcConf: rpc.Config{
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
BackendConfig: backend.Config{
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
ConnectionPoolSize: backend.DefaultConnectionPoolSize,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
rpcMetricsEnabled: false,
apiRatelimits: nil,
Expand Down Expand Up @@ -162,6 +168,12 @@ type ObserverServiceBuilder struct {

// Public network
peerID peer.ID

RestMetrics *metrics.RestCollector
AccessMetrics module.AccessMetrics
// grpc servers
secureGrpcServer *grpcserver.GrpcServer
unsecureGrpcServer *grpcserver.GrpcServer
}

// deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters.
Expand Down Expand Up @@ -446,7 +458,8 @@ func (builder *ObserverServiceBuilder) extraFlags() {
flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on")
flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)")
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", defaultConfig.rpcConf.MaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringToIntVar(&builder.apiRatelimits, "api-rate-limits", defaultConfig.apiRatelimits, "per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.")
flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.")
flags.StringVar(&builder.observerNetworkingKeyPath, "observer-networking-key-path", defaultConfig.observerNetworkingKeyPath, "path to the networking key for observer")
Expand Down Expand Up @@ -844,11 +857,64 @@ func (builder *ObserverServiceBuilder) enqueueConnectWithStakedAN() {
}

func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.Module("creating grpc servers", func(node *cmd.NodeConfig) error {
builder.secureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger,
builder.rpcConf.SecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build()

builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger,
builder.rpcConf.UnsecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits).Build()

return nil
})
builder.Module("rest metrics", func(node *cmd.NodeConfig) error {
m, err := metrics.NewRestCollector(routes.URLToRoute, node.MetricsRegisterer)
if err != nil {
return err
}
builder.RestMetrics = m
return nil
})
builder.Module("access metrics", func(node *cmd.NodeConfig) error {
builder.AccessMetrics = metrics.NewAccessCollector(
metrics.WithRestMetrics(builder.RestMetrics),
)
return nil
})
builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engineBuilder, err := rpc.NewBuilder(
node.Logger,
accessMetrics := builder.AccessMetrics
config := builder.rpcConf
backendConfig := config.BackendConfig

backendCache, cacheSize, err := backend.NewCache(node.Logger,
accessMetrics,
config.BackendConfig.ConnectionPoolSize)
if err != nil {
return nil, fmt.Errorf("could not initialize backend cache: %w", err)
}

connFactory := &backend.ConnectionFactoryImpl{
CollectionGRPCPort: 0,
ExecutionGRPCPort: 0,
CollectionNodeGRPCTimeout: backendConfig.CollectionClientTimeout,
ExecutionNodeGRPCTimeout: backendConfig.ExecutionClientTimeout,
ConnectionsCache: backendCache,
CacheSize: cacheSize,
MaxMsgSize: config.MaxMsgSize,
AccessMetrics: accessMetrics,
Log: node.Logger,
}

accessBackend := backend.New(
node.State,
builder.rpcConf,
nil,
nil,
node.Storage.Blocks,
Expand All @@ -858,28 +924,56 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
node.Storage.Receipts,
node.Storage.Results,
node.RootChainID,
metrics.NewNoopCollector(),
0,
0,
accessMetrics,
connFactory,
false,
backendConfig.MaxHeightRange,
backendConfig.PreferredExecutionNodeIDs,
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList,
backendConfig.CircuitBreakerConfig.Enabled)

observerCollector := metrics.NewObserverCollector()
restHandler, err := restapiproxy.NewRestProxyHandler(
accessBackend,
builder.upstreamIdentities,
builder.apiTimeout,
config.MaxMsgSize,
builder.Logger,
observerCollector,
node.RootChainID.Chain())
if err != nil {
return nil, err
}

engineBuilder, err := rpc.NewBuilder(
node.Logger,
node.State,
config,
node.RootChainID,
accessMetrics,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
builder.Me,
accessBackend,
restHandler,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
if err != nil {
return nil, err
}

// upstream access node forwarder
forwarder, err := apiproxy.NewFlowAccessAPIForwarder(builder.upstreamIdentities, builder.apiTimeout, builder.rpcConf.MaxMsgSize)
forwarder, err := apiproxy.NewFlowAccessAPIForwarder(builder.upstreamIdentities, builder.apiTimeout, config.MaxMsgSize)
if err != nil {
return nil, err
}

proxy := &apiproxy.FlowAccessAPIRouter{
rpcHandler := &apiproxy.FlowAccessAPIRouter{
Logger: builder.Logger,
Metrics: metrics.NewObserverCollector(),
Metrics: observerCollector,
Upstream: forwarder,
Observer: protocol.NewHandler(protocol.New(
node.State,
Expand All @@ -891,7 +985,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {

// build the rpc engine
builder.RpcEng, err = engineBuilder.
WithNewHandler(proxy).
WithRpcHandler(rpcHandler).
WithLegacy().
Build()
if err != nil {
Expand All @@ -900,6 +994,16 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)
return builder.RpcEng, nil
})

// build secure grpc server
builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})

// build unsecure grpc server
builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.unsecureGrpcServer, nil
})
}

// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
Expand Down
Loading

0 comments on commit ec9fb75

Please sign in to comment.