diff --git a/core/client.go b/core/client.go deleted file mode 100644 index 2bef6dba0c..0000000000 --- a/core/client.go +++ /dev/null @@ -1,67 +0,0 @@ -package core - -import ( - "fmt" - - coregrpc "github.com/tendermint/tendermint/rpc/grpc" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// Client is a core gRPC client. -type Client struct { - coregrpc.BlockAPIClient - host, port string - conn *grpc.ClientConn -} - -// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC. -// The connection is not started when creating the client. -// Use the Start method to start the connection. -func NewClient(host, port string) *Client { - return &Client{ - host: host, - port: port, - } -} - -// Start created the Client's gRPC connection with optional dial options. -// If the connection is already started, it does nothing. -func (c *Client) Start(opts ...grpc.DialOption) error { - if c.IsRunning() { - return nil - } - if len(opts) == 0 { - opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - } - conn, err := grpc.NewClient( - fmt.Sprintf("%s:%s", c.host, c.port), - opts..., - ) - if err != nil { - return err - } - c.conn = conn - - c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn) - return nil -} - -// IsRunning checks if the client's connection is established and ready for use. -// It returns true if the connection is active, false otherwise. -func (c *Client) IsRunning() bool { - return c.conn != nil && c.BlockAPIClient != nil -} - -// Stop terminates the Client's gRPC connection and releases all related resources. -// If the connection is already stopped, it does nothing. -func (c *Client) Stop() error { - if !c.IsRunning() { - return nil - } - defer func() { - c.conn = nil - c.BlockAPIClient = nil - }() - return c.conn.Close() -} diff --git a/core/exchange_test.go b/core/exchange_test.go index 5ed165b857..b791e6da8a 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -170,8 +170,7 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn require.NoError(t, err) host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) return fetcher, cctx diff --git a/core/fetcher.go b/core/fetcher.go index 19f7c12868..6f049cc7ac 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -2,7 +2,6 @@ package core import ( "context" - "errors" "fmt" "io" "sync/atomic" @@ -13,14 +12,13 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" + "google.golang.org/grpc" libhead "github.com/celestiaorg/go-header" ) const newBlockSubscriber = "NewBlock/Events" -var ErrClientNotRunning = errors.New("gRPC connection to core node is not running") - type SignedBlock struct { Header *types.Header `json:"header"` Commit *types.Commit `json:"commit"` @@ -34,7 +32,7 @@ var ( ) type BlockFetcher struct { - client *Client + client coregrpc.BlockAPIClient doneCh chan struct{} cancel context.CancelFunc @@ -42,9 +40,9 @@ type BlockFetcher struct { } // NewBlockFetcher returns a new `BlockFetcher`. -func NewBlockFetcher(client *Client) (*BlockFetcher, error) { +func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) { return &BlockFetcher{ - client: client, + client: coregrpc.NewBlockAPIClient(conn), }, nil } @@ -62,10 +60,6 @@ func (f *BlockFetcher) Stop(ctx context.Context) error { // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, nil, ErrClientNotRunning - } commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -87,10 +81,6 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C // GetBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -103,10 +93,6 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } if hash == nil { return nil, fmt.Errorf("cannot get block with nil hash") } @@ -125,10 +111,6 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (* // GetSignedBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height. func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -140,10 +122,6 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe // the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err @@ -165,10 +143,6 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, // block at the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) if err != nil { return nil, err @@ -189,10 +163,6 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return nil, ErrClientNotRunning - } if f.isListeningForBlocks.Load() { return nil, fmt.Errorf("already subscribed to new blocks") } @@ -252,10 +222,6 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { - // return error if the client is still not started - if !f.client.IsRunning() { - return false, ErrClientNotRunning - } resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 2f34e4bc05..d184fb8b91 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -22,8 +22,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) { node := StartTestNode(t) host, port, err := net.SplitHostPort(node.GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 14eeab0bd7..8d7659494d 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -16,11 +16,9 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) diff --git a/core/header_test.go b/core/header_test.go index d3c8ab116a..dcc5dba9e2 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -23,11 +23,9 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := NewClient(host, port) - require.NoError(t, client.Start()) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) <-sub diff --git a/core/testing.go b/core/testing.go index d4b5f6334b..586bf57f83 100644 --- a/core/testing.go +++ b/core/testing.go @@ -1,10 +1,16 @@ package core import ( + "context" + "net" "testing" "time" + "github.com/stretchr/testify/require" tmrand "github.com/tendermint/tendermint/libs/rand" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" @@ -60,3 +66,16 @@ func generateRandomAccounts(n int) []string { } return accounts } + +func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn { + t.Helper() + opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + endpoint := net.JoinHostPort(ip, port) + client, err := grpc.NewClient(endpoint, opt) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + t.Cleanup(cancel) + ready := client.WaitForStateChange(ctx, connectivity.Ready) + require.True(t, ready) + return client +} diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index ae598a6029..e4e7593e3f 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -1,9 +1,115 @@ package core import ( - "github.com/celestiaorg/celestia-node/core" + "context" + "crypto/tls" + "encoding/json" + "errors" + "net" + "os" + "path/filepath" + + "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" + + "github.com/celestiaorg/celestia-node/libs/utils" ) -func remote(cfg Config) *core.Client { - return core.NewClient(cfg.IP, cfg.Port) +const xtokenFileName = "xtoken.json" + +func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) { + var opts []grpc.DialOption + if cfg.TLSEnabled { + opts = append(opts, grpc.WithTransportCredentials( + credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})), + ) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + if cfg.XTokenPath != "" { + xToken, err := parseTokenPath(cfg.XTokenPath) + if err != nil { + return nil, err + } + opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken))) + opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken))) + } + + endpoint := net.JoinHostPort(cfg.IP, cfg.Port) + conn, err := grpc.NewClient(endpoint, opts...) + if err != nil { + return nil, err + } + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + conn.Connect() + if !conn.WaitForStateChange(ctx, connectivity.Ready) { + return errors.New("couldn't connect to core endpoint") + } + return nil + }, + OnStop: func(context.Context) error { + return conn.Close() + }, + }) + return conn, nil +} + +func authInterceptor(xtoken string) grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func authStreamInterceptor(xtoken string) grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) + return streamer(ctx, desc, cc, method, opts...) + } +} + +// parseTokenPath retrieves the authentication token from a JSON file at the specified path. +func parseTokenPath(xtokenPath string) (string, error) { + xtokenPath = filepath.Join(xtokenPath, xtokenFileName) + exist := utils.Exists(xtokenPath) + if !exist { + return "", os.ErrNotExist + } + + token, err := os.ReadFile(xtokenPath) + if err != nil { + return "", err + } + + auth := struct { + Token string `json:"x-token"` + }{} + + err = json.Unmarshal(token, &auth) + if err != nil { + return "", err + } + if auth.Token == "" { + return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") + } + return auth.Token, nil } diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 61a4e3468c..a81365659a 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -25,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), + fx.Provide(grpcClient), fx.Options(options...), ) @@ -74,15 +75,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option return listener.Stop(ctx) }), )), - fx.Provide(fx.Annotate( - remote, - fx.OnStart(func(_ context.Context, client *core.Client) error { - return client.Start() - }), - fx.OnStop(func(_ context.Context, client *core.Client) error { - return client.Stop() - }), - )), ) default: panic("invalid node type") diff --git a/nodebuilder/core/opts.go b/nodebuilder/core/opts.go index 26b25b4541..5de789daa9 100644 --- a/nodebuilder/core/opts.go +++ b/nodebuilder/core/opts.go @@ -2,15 +2,15 @@ package core import ( "go.uber.org/fx" + "google.golang.org/grpc" - "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/libs/fxutil" ) -// WithClient sets a custom client for core process -func WithClient(client *core.Client) fx.Option { - return fxutil.ReplaceAs(client, new(core.Client)) +// WithConnection sets a custom client for core process +func WithConnection(conn *grpc.ClientConn) fx.Option { + return fxutil.ReplaceAs(conn, new(grpc.ClientConn)) } // WithHeaderConstructFn sets custom func that creates extended header diff --git a/nodebuilder/core/tls.go b/nodebuilder/core/tls.go deleted file mode 100644 index f280d0cfd0..0000000000 --- a/nodebuilder/core/tls.go +++ /dev/null @@ -1,44 +0,0 @@ -package core - -import ( - "crypto/tls" - "encoding/json" - "errors" - "os" - "path/filepath" - - "github.com/celestiaorg/celestia-node/libs/utils" -) - -const xtokenFileName = "xtoken.json" - -func EmptyTLSConfig() *tls.Config { - return &tls.Config{MinVersion: tls.VersionTLS12} -} - -// XToken retrieves the authentication token from a JSON file at the specified path. -func XToken(xtokenPath string) (string, error) { - xtokenPath = filepath.Join(xtokenPath, xtokenFileName) - exist := utils.Exists(xtokenPath) - if !exist { - return "", os.ErrNotExist - } - - token, err := os.ReadFile(xtokenPath) - if err != nil { - return "", err - } - - auth := struct { - Token string `json:"x-token"` - }{} - - err = json.Unmarshal(token, &auth) - if err != nil { - return "", err - } - if auth.Token == "" { - return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath") - } - return auth.Token, nil -} diff --git a/nodebuilder/module.go b/nodebuilder/module.go index 5a774b8b9b..43de56eedd 100644 --- a/nodebuilder/module.go +++ b/nodebuilder/module.go @@ -42,14 +42,14 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store fx.Supply(store.Config), fx.Provide(store.Datastore), fx.Provide(store.Keystore), + core.ConstructModule(tp, &cfg.Core), fx.Supply(node.StorePath(store.Path())), // modules provided by the node p2p.ConstructModule(tp, &cfg.P2P), - state.ConstructModule(tp, &cfg.State, &cfg.Core), modhead.ConstructModule[*header.ExtendedHeader](tp, &cfg.Header), share.ConstructModule(tp, &cfg.Share), gateway.ConstructModule(tp, &cfg.Gateway), - core.ConstructModule(tp, &cfg.Core), + state.ConstructModule(tp, &cfg.State, &cfg.Core), das.ConstructModule(tp, &cfg.DASer), fraud.ConstructModule(tp), blob.ConstructModule(), diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index a4cac93c99..5647a6afa1 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -6,6 +6,8 @@ import ( "testing" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-node/core" coremodule "github.com/celestiaorg/celestia-node/nodebuilder/core" @@ -20,11 +22,14 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - client := core.NewClient(host, port) - require.NoError(t, client.Start()) - node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithConnection(con)) require.NoError(t, err) require.NotNil(t, node) err = node.Start(ctx) diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index c55f0da01b..d66da88c44 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -1,16 +1,13 @@ package state import ( - "errors" - "os" - "github.com/cosmos/cosmos-sdk/crypto/keyring" + "google.golang.org/grpc" libfraud "github.com/celestiaorg/go-fraud" "github.com/celestiaorg/go-header/sync" "github.com/celestiaorg/celestia-node/header" - "github.com/celestiaorg/celestia-node/nodebuilder/core" modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/share/eds/byzantine" @@ -20,30 +17,19 @@ import ( // coreAccessor constructs a new instance of state.Module over // a celestia-core connection. func coreAccessor( - corecfg core.Config, keyring keyring.Keyring, keyname AccountName, sync *sync.Syncer[*header.ExtendedHeader], fraudServ libfraud.Service[*header.ExtendedHeader], network p2p.Network, - opts []state.Option, + client *grpc.ClientConn, ) ( *state.CoreAccessor, Module, *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], error, ) { - if corecfg.TLSEnabled { - tlsCfg := core.EmptyTLSConfig() - xtoken, err := core.XToken(corecfg.XTokenPath) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return nil, nil, nil, err - } - opts = append(opts, state.WithTLSConfig(tlsCfg), state.WithXToken(xtoken)) - } - - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, - corecfg.IP, corecfg.Port, network.String(), opts...) + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, client, network.String()) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ Service: ca, diff --git a/nodebuilder/state/module.go b/nodebuilder/state/module.go index bd6f2081d3..0e80ab3209 100644 --- a/nodebuilder/state/module.go +++ b/nodebuilder/state/module.go @@ -23,11 +23,9 @@ var log = logging.Logger("module/state") func ConstructModule(tp node.Type, cfg *Config, coreCfg *core.Config) fx.Option { // sanitize config values before constructing module cfgErr := cfg.Validate() - opts := make([]state.Option, 0) baseComponents := fx.Options( fx.Supply(*cfg), fx.Error(cfgErr), - fx.Supply(opts), fx.Provide(func(ks keystore.Keystore) (keyring.Keyring, AccountName, error) { return Keyring(*cfg, ks) }), diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index d205fca120..44c026f007 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -8,6 +8,8 @@ import ( "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/stretchr/testify/require" "go.uber.org/fx" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" libhead "github.com/celestiaorg/go-header" @@ -73,12 +75,15 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti // in fact, we don't need core.Client in tests, but the Bridge node requires a valid one. // otherwise, it fails with a failed attempt to connect with a custom build client. if tp == node.Bridge { - host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + _, _, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + con, err := grpc.NewClient( + core.StartTestNode(t).GRPCClient.Target(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) require.NoError(t, err) - client := core.NewClient(host, port) - require.NoError(t, client.Start()) opts = append(opts, - fxutil.ReplaceAs(client, new(core.Client)), + fxutil.ReplaceAs(con, new(grpc.ClientConn)), ) } diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 1f4a56b319..3fe779e76d 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -18,6 +18,8 @@ import ( "github.com/tendermint/tendermint/types" "go.uber.org/fx" "golang.org/x/exp/maps" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/test/util/testnode" libhead "github.com/celestiaorg/go-header" @@ -181,9 +183,13 @@ func (s *Swamp) setupGenesis() { host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) require.NoError(s.t, err) - client := core.NewClient(host, port) - require.NoError(s.t, client.Start()) - fetcher, err := core.NewBlockFetcher(client) + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) + fetcher, err := core.NewBlockFetcher(con) require.NoError(s.t, err) ex, err := core.NewExchange( @@ -292,12 +298,14 @@ func (s *Swamp) NewNodeWithStore( if err != nil { return nil, err } - client := core.NewClient(host, port) - if err := client.Start(); err != nil { - return nil, err - } + addr := net.JoinHostPort(host, port) + con, err := grpc.NewClient( + addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(s.t, err) options = append(options, - coremodule.WithClient(client), + coremodule.WithConnection(con), ) default: } diff --git a/state/core_access.go b/state/core_access.go index 80e6a8eccc..07747f5333 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -2,10 +2,8 @@ package state import ( "context" - "crypto/tls" "errors" "fmt" - "net" "sync" "time" @@ -21,10 +19,6 @@ import ( "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/proto/tendermint/crypto" "google.golang.org/grpc" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/metadata" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/app/encoding" @@ -46,22 +40,6 @@ var ( log = logging.Logger("state") ) -// Option is the functional option that is applied to the coreAccessor instance -// to configure parameters. -type Option func(ca *CoreAccessor) - -func WithTLSConfig(cfg *tls.Config) Option { - return func(ca *CoreAccessor) { - ca.tls = cfg - } -} - -func WithXToken(xtoken string) Option { - return func(ca *CoreAccessor) { - ca.xtoken = xtoken - } -} - // CoreAccessor implements service over a gRPC connection // with a celestia-core node. type CoreAccessor struct { @@ -84,13 +62,8 @@ type CoreAccessor struct { prt *merkle.ProofRuntime coreConn *grpc.ClientConn - coreIP string - port string network string - tls *tls.Config - xtoken string - // these fields are mutatable and thus need to be protected by a mutex lock sync.Mutex lastPayForBlob int64 @@ -109,8 +82,8 @@ func NewCoreAccessor( keyring keyring.Keyring, keyname string, getter libhead.Head[*header.ExtendedHeader], - coreIP, port, network string, - options ...Option, + conn *grpc.ClientConn, + network string, ) (*CoreAccessor, error) { // create verifier prt := merkle.DefaultProofRuntime() @@ -121,33 +94,18 @@ func NewCoreAccessor( keyring: keyring, defaultSignerAccount: keyname, getter: getter, - coreIP: coreIP, - port: port, prt: prt, + coreConn: conn, network: network, } - - for _, opt := range options { - opt(ca) - } return ca, nil } func (ca *CoreAccessor) Start(ctx context.Context) error { - if ca.coreConn != nil { - return fmt.Errorf("core-access: already connected to core endpoint") - } ca.ctx, ca.cancel = context.WithCancel(context.Background()) - - err := ca.startGRPCClient(ctx) - if err != nil { - return fmt.Errorf("failed to start grpc client: %w", err) - } - // create the staking query client ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.feeGrantCli = feegrant.NewQueryClient(ca.coreConn) - // create ABCI query client ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) resp, err := ca.abciQueryCli.GetNodeInfo(ctx, &tmservice.GetNodeInfoRequest{}) @@ -175,29 +133,8 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { } func (ca *CoreAccessor) Stop(context.Context) error { - if ca.cancel == nil { - log.Warn("core accessor already stopped") - return nil - } - if ca.coreConn == nil { - log.Warn("no connection found to close") - return nil - } - defer ca.cancelCtx() - - // close out core connection - err := ca.coreConn.Close() - if err != nil { - return err - } - - ca.coreConn = nil - return nil -} - -func (ca *CoreAccessor) cancelCtx() { ca.cancel() - ca.cancel = nil + return nil } // SubmitPayForBlob builds, signs, and synchronously submits a MsgPayForBlob with additional @@ -605,40 +542,6 @@ func (ca *CoreAccessor) setupTxClient(ctx context.Context, keyName string) (*use ) } -func (ca *CoreAccessor) startGRPCClient(ctx context.Context) error { - // dial given celestia-core endpoint - endpoint := net.JoinHostPort(ca.coreIP, ca.port) - // By default, the gRPC client is configured to handle an insecure connection. - // If the TLS configuration is not empty, it will be applied to the client's options. - // If the TLS configuration is empty but the X-Token is provided, - // the X-Token will be applied as an interceptor along with an empty TLS configuration. - opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - if ca.tls != nil { - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(ca.tls))) - } - if ca.xtoken != "" { - opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(ca.xtoken))) - } - - client, err := grpc.NewClient( - endpoint, - opts..., - ) - if err != nil { - return err - } - // this ensures we can't start the node without core connection - client.Connect() - if !client.WaitForStateChange(ctx, connectivity.Ready) { - // hits the case when context is canceled - return fmt.Errorf("couldn't connect to core endpoint(%s): %w", endpoint, ctx.Err()) - } - ca.coreConn = client - - log.Infof("Connection with core endpoint(%s) established", endpoint) - return nil -} - func (ca *CoreAccessor) submitMsg( ctx context.Context, msg sdktypes.Msg, @@ -695,17 +598,3 @@ func convertToSdkTxResponse(resp *user.TxResponse) *TxResponse { Height: resp.Height, } } - -func authInterceptor(xtoken string) grpc.UnaryClientInterceptor { - return func( - ctx context.Context, - method string, - req, reply interface{}, - cc *grpc.ClientConn, - invoker grpc.UnaryInvoker, - opts ...grpc.CallOption, - ) error { - ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken) - return invoker(ctx, method, req, reply, cc, opts...) - } -} diff --git a/state/core_access_test.go b/state/core_access_test.go index c487944749..cde4e8182f 100644 --- a/state/core_access_test.go +++ b/state/core_access_test.go @@ -6,13 +6,14 @@ import ( "context" "errors" "fmt" - "strings" "testing" "time" sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "github.com/celestiaorg/celestia-app/v3/app" "github.com/celestiaorg/celestia-app/v3/pkg/appconsts" @@ -216,11 +217,6 @@ func TestDelegate(t *testing.T) { } } -func extractPort(addr string) string { - splitStr := strings.Split(addr, ":") - return splitStr[len(splitStr)-1] -} - func buildAccessor(t *testing.T) (*CoreAccessor, []string) { chainID := "private" @@ -264,7 +260,9 @@ func buildAccessor(t *testing.T) (*CoreAccessor, []string) { WithAppCreator(appCreator) // needed until https://github.com/celestiaorg/celestia-app/pull/3680 merges cctx, _, grpcAddr := testnode.NewNetwork(t, config) - ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, "127.0.0.1", extractPort(grpcAddr), chainID) + conn, err := grpc.NewClient(grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + ca, err := NewCoreAccessor(cctx.Keyring, accounts[0].Name, nil, conn, chainID) require.NoError(t, err) return ca, getNames(accounts) } diff --git a/state/integration_test.go b/state/integration_test.go index 8680b4d181..f606841417 100644 --- a/state/integration_test.go +++ b/state/integration_test.go @@ -52,8 +52,11 @@ func (s *IntegrationTestSuite) SetupSuite() { s.Require().Greater(len(s.accounts), 0) accountName := s.accounts[0].Name - accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, "", "", "") + accessor, err := NewCoreAccessor(s.cctx.Keyring, accountName, localHeader{s.cctx.Client}, nil, "") require.NoError(s.T(), err) + ctx, cancel := context.WithCancel(context.Background()) + accessor.ctx = ctx + accessor.cancel = cancel setClients(accessor, s.cctx.GRPCClient) s.accessor = accessor @@ -65,8 +68,7 @@ func (s *IntegrationTestSuite) SetupSuite() { func setClients(ca *CoreAccessor, conn *grpc.ClientConn) { ca.coreConn = conn // create the staking query client - stakingCli := stakingtypes.NewQueryClient(ca.coreConn) - ca.stakingCli = stakingCli + ca.stakingCli = stakingtypes.NewQueryClient(ca.coreConn) ca.abciQueryCli = tmservice.NewServiceClient(ca.coreConn) }