diff --git a/runtime/v2/app.go b/runtime/v2/app.go index 8df0bdb7dedf..014849c497b0 100644 --- a/runtime/v2/app.go +++ b/runtime/v2/app.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" + gogoproto "github.com/cosmos/gogoproto/proto" "golang.org/x/exp/slices" runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2" @@ -44,6 +45,10 @@ type App[T transaction.Tx] struct { interfaceRegistrar registry.InterfaceRegistrar amino legacy.Amino moduleManager *MM[T] + + // GRPCQueryDecoders maps gRPC method name to a function that decodes the request + // bytes into a gogoproto.Message, which then can be passed to appmanager. + GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) } // Logger returns the app logger. @@ -109,3 +114,7 @@ func (a *App[T]) ExecuteGenesisTx(_ []byte) error { func (a *App[T]) GetAppManager() *appmanager.AppManager[T] { return a.AppManager } + +func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) { + return a.GRPCQueryDecoders +} diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 0b139c95bc35..ff9102085bad 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "reflect" "sort" gogoproto "github.com/cosmos/gogoproto/proto" @@ -555,17 +556,28 @@ func (m *MM[T]) assertNoForgottenModules( func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error { c := &configurator{ - stfQueryRouter: app.queryRouterBuilder, - stfMsgRouter: app.msgRouterBuilder, - registry: registry, - err: nil, + grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){}, + stfQueryRouter: app.queryRouterBuilder, + stfMsgRouter: app.msgRouterBuilder, + registry: registry, + err: nil, } - return s.RegisterServices(c) + + err := s.RegisterServices(c) + if err != nil { + return fmt.Errorf("unable to register services: %w", err) + } + app.GRPCQueryDecoders = c.grpcQueryDecoders + return nil } var _ grpc.ServiceRegistrar = (*configurator)(nil) type configurator struct { + // grpcQueryDecoders is required because module expose queries through gRPC + // this provides a way to route to modules using gRPC. + grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error) + stfQueryRouter *stf.MsgRouterBuilder stfMsgRouter *stf.MsgRouterBuilder registry *protoregistry.Files @@ -596,17 +608,28 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) { func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error { for _, md := range sd.Methods { // TODO(tip): what if a query is not deterministic? - err := registerMethod(c.stfQueryRouter, sd, md, ss) + requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss) if err != nil { return fmt.Errorf("unable to register query handler %s: %w", md.MethodName, err) } + + // register gRPC query method. + typ := gogoproto.MessageType(requestFullName) + if typ == nil { + return fmt.Errorf("unable to find message in gogotype registry: %w", err) + } + decoderFunc := func(bytes []byte) (gogoproto.Message, error) { + msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message) + return msg, gogoproto.Unmarshal(bytes, msg) + } + c.grpcQueryDecoders[md.MethodName] = decoderFunc } return nil } func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error { for _, md := range sd.Methods { - err := registerMethod(c.stfMsgRouter, sd, md, ss) + _, err := registerMethod(c.stfMsgRouter, sd, md, ss) if err != nil { return fmt.Errorf("unable to register msg handler %s: %w", md.MethodName, err) } @@ -633,13 +656,13 @@ func registerMethod( sd *grpc.ServiceDesc, md grpc.MethodDesc, ss interface{}, -) error { +) (string, error) { requestName, err := requestFullNameFromMethodDesc(sd, md) if err != nil { - return err + return "", err } - return stfRouter.RegisterHandler(string(requestName), func( + return string(requestName), stfRouter.RegisterHandler(string(requestName), func( ctx context.Context, msg appmodulev2.Message, ) (resp appmodulev2.Message, err error) { diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index c500d47e1edb..f63953b0c8d3 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -9,6 +9,7 @@ import ( abci "github.com/cometbft/cometbft/abci/types" abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1" + gogoproto "github.com/cosmos/gogoproto/proto" coreappmgr "cosmossdk.io/core/app" "cosmossdk.io/core/comet" @@ -31,6 +32,9 @@ import ( var _ abci.Application = (*Consensus[transaction.Tx])(nil) type Consensus[T transaction.Tx] struct { + // legacy support for gRPC + grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) + app *appmanager.AppManager[T] cfg Config store types.Store @@ -56,18 +60,28 @@ type Consensus[T transaction.Tx] struct { func NewConsensus[T transaction.Tx]( app *appmanager.AppManager[T], mp mempool.Mempool[T], + grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error), store types.Store, cfg Config, txCodec transaction.Codec[T], logger log.Logger, ) *Consensus[T] { return &Consensus[T]{ - mempool: mp, - store: store, - app: app, - cfg: cfg, - txCodec: txCodec, - logger: logger, + grpcQueryDecoders: grpcQueryDecoders, + app: app, + cfg: cfg, + store: store, + logger: logger, + txCodec: txCodec, + streaming: streaming.Manager{}, + snapshotManager: nil, + mempool: mp, + lastCommittedHeight: atomic.Int64{}, + prepareProposalHandler: nil, + processProposalHandler: nil, + verifyVoteExt: nil, + extendVote: nil, + chainID: "", } } @@ -150,18 +164,16 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // Query implements types.Application. // It is called by cometbft to query application state. -func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (*abciproto.QueryResponse, error) { - // follow the query path from here - decodedMsg, err := c.txCodec.Decode(req.Data) - protoMsg, ok := any(decodedMsg).(transaction.Msg) - if !ok { - return nil, fmt.Errorf("decoded type T %T must implement core/transaction.Msg", decodedMsg) - } - - // if no error is returned then we can handle the query with the appmanager - // otherwise it is a KV store query - if err == nil { - res, err := c.app.Query(ctx, uint64(req.Height), protoMsg) +func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { + // check if it's a gRPC method + grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path] + if isGRPC { + protoRequest, err := grpcQueryDecoder(req.Data) + if err != nil { + return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) + } + res, err := c.app.Query(ctx, uint64(req.Height), protoRequest) + if err != nil { resp := queryResult(err) resp.Height = req.Height @@ -179,8 +191,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) ( return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil } - var resp *abciproto.QueryResponse - switch path[0] { case cmtservice.QueryPathApp: resp, err = c.handlerQueryApp(ctx, path, req) @@ -391,7 +401,7 @@ func (c *Consensus[T]) FinalizeBlock( // ProposerAddress: req.ProposerAddress, // LastCommit: req.DecidedLastCommit, // }, - //} + // } // // ctx = context.WithValue(ctx, corecontext.CometInfoKey, &comet.Info{ // Evidence: sdktypes.ToSDKEvidence(req.Misbehavior), diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index f7ab69af05b6..9b84a776400c 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -60,7 +60,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l // create consensus store := appI.GetStore().(types.Store) - consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, store, s.config, s.initTxCodec, s.logger) + consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, appI.GetGRPCQueryDecoders(), store, s.config, s.initTxCodec, s.logger) consensus.prepareProposalHandler = s.options.PrepareProposalHandler consensus.processProposalHandler = s.options.ProcessProposalHandler diff --git a/server/v2/types.go b/server/v2/types.go index 034f333db76a..3382d1b27b6b 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -1,6 +1,7 @@ package serverv2 import ( + gogoproto "github.com/cosmos/gogoproto/proto" "github.com/spf13/viper" coreapp "cosmossdk.io/core/app" @@ -15,5 +16,6 @@ type AppI[T transaction.Tx] interface { GetAppManager() *appmanager.AppManager[T] GetConsensusAuthority() string InterfaceRegistry() coreapp.InterfaceRegistry + GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) GetStore() any } diff --git a/simapp/v2/app_di.go b/simapp/v2/app_di.go index b1e05095d904..5f4b2455bc8d 100644 --- a/simapp/v2/app_di.go +++ b/simapp/v2/app_di.go @@ -224,7 +224,6 @@ func NewSimApp[T transaction.Tx]( if err := app.LoadLatest(); err != nil { panic(err) } - return app } diff --git a/simapp/v2/go.mod b/simapp/v2/go.mod index 8fc91127cfb2..1d641dca6ca7 100644 --- a/simapp/v2/go.mod +++ b/simapp/v2/go.mod @@ -4,7 +4,7 @@ go 1.22.2 require ( cosmossdk.io/api v0.7.5 - cosmossdk.io/collections v0.4.0 // indirect + cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000 cosmossdk.io/core v0.12.1-0.20231114100755-569e3ff6a0d7 cosmossdk.io/depinject v1.0.0-alpha.4 cosmossdk.io/log v1.3.1 @@ -13,6 +13,7 @@ require ( cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000 cosmossdk.io/server/v2/cometbft v0.0.0-00010101000000-000000000000 cosmossdk.io/store/v2 v2.0.0 + cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000 cosmossdk.io/x/accounts v0.0.0-20240226161501-23359a0b6d91 cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000 cosmossdk.io/x/authz v0.0.0-00010101000000-000000000000 @@ -29,28 +30,18 @@ require ( cosmossdk.io/x/protocolpool v0.0.0-20230925135524-a1bc045b3190 cosmossdk.io/x/slashing v0.0.0-00010101000000-000000000000 cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 - cosmossdk.io/x/tx v0.13.3 // indirect cosmossdk.io/x/upgrade v0.0.0-20230613133644-0a778132a60f github.com/cometbft/cometbft v1.0.0-rc1 github.com/cosmos/cosmos-db v1.0.2 // this version is not used as it is always replaced by the latest Cosmos SDK version github.com/cosmos/cosmos-sdk v0.51.0 - github.com/cosmos/gogoproto v1.5.0 // indirect - github.com/golang/mock v1.6.0 // indirect - github.com/spf13/cast v1.6.0 // indirect github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 - golang.org/x/sync v0.7.0 // indirect google.golang.org/protobuf v1.34.2 ) -require ( - cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000 - cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000 -) - require ( buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.34.2-20240701160653-fedbb9acfd2f.2 // indirect buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect @@ -60,6 +51,7 @@ require ( cloud.google.com/go/compute/metadata v0.3.0 // indirect cloud.google.com/go/iam v1.1.8 // indirect cloud.google.com/go/storage v1.42.0 // indirect + cosmossdk.io/collections v0.4.0 // indirect cosmossdk.io/core/testing v0.0.0-00010101000000-000000000000 // indirect cosmossdk.io/errors v1.0.1 // indirect cosmossdk.io/schema v0.1.1 // indirect @@ -69,6 +61,7 @@ require ( cosmossdk.io/x/accounts/defaults/lockup v0.0.0-20240417181816-5e7aae0db1f5 // indirect cosmossdk.io/x/accounts/defaults/multisig v0.0.0-00010101000000-000000000000 // indirect cosmossdk.io/x/epochs v0.0.0-20240522060652-a1ae4c3e0337 // indirect + cosmossdk.io/x/tx v0.13.3 // indirect filippo.io/edwards25519 v1.1.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.2 // indirect @@ -97,6 +90,7 @@ require ( github.com/cosmos/crypto v0.1.1 // indirect github.com/cosmos/go-bip39 v1.0.0 // indirect github.com/cosmos/gogogateway v1.2.0 // indirect + github.com/cosmos/gogoproto v1.5.0 // indirect github.com/cosmos/iavl v1.2.0 // indirect github.com/cosmos/ics23/go v0.10.0 // indirect github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect @@ -125,6 +119,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect @@ -200,6 +195,7 @@ require ( github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect + github.com/spf13/cast v1.6.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/supranational/blst v0.3.12 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect @@ -223,6 +219,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect