Skip to content

Commit

Permalink
refactor(core): unify grpc clients (#3999)
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs authored Dec 13, 2024
1 parent b5fc555 commit 872d422
Show file tree
Hide file tree
Showing 20 changed files with 194 additions and 337 deletions.
67 changes: 0 additions & 67 deletions core/client.go

This file was deleted.

3 changes: 1 addition & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
require.NoError(t, err)
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
return fetcher, cctx
Expand Down
42 changes: 4 additions & 38 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package core

import (
"context"
"errors"
"fmt"
"io"
"sync/atomic"
Expand All @@ -13,14 +12,13 @@ import (
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"

libhead "github.com/celestiaorg/go-header"
)

const newBlockSubscriber = "NewBlock/Events"

var ErrClientNotRunning = errors.New("gRPC connection to core node is not running")

type SignedBlock struct {
Header *types.Header `json:"header"`
Commit *types.Commit `json:"commit"`
Expand All @@ -34,17 +32,17 @@ var (
)

type BlockFetcher struct {
client *Client
client coregrpc.BlockAPIClient

doneCh chan struct{}
cancel context.CancelFunc
isListeningForBlocks atomic.Bool
}

// NewBlockFetcher returns a new `BlockFetcher`.
func NewBlockFetcher(client *Client) (*BlockFetcher, error) {
func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) {
return &BlockFetcher{
client: client,
client: coregrpc.NewBlockAPIClient(conn),
}, nil
}

Expand All @@ -62,10 +60,6 @@ func (f *BlockFetcher) Stop(ctx context.Context) error {

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, nil, ErrClientNotRunning
}
commit, err := f.Commit(ctx, height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err)
Expand All @@ -87,10 +81,6 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C
// GetBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height
func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -103,10 +93,6 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*SignedBlock
}

func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
if hash == nil {
return nil, fmt.Errorf("cannot get block with nil hash")
}
Expand All @@ -125,10 +111,6 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*
// GetSignedBlock queries Core for a `Block` at the given height.
// if the height is nil, use the latest height.
func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -140,10 +122,6 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe
// the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -165,10 +143,6 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit,
// block at the given height.
// If the height is nil, use the latest height.
func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height})
if err != nil {
return nil, err
Expand All @@ -189,10 +163,6 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V
// SubscribeNewBlockEvent subscribes to new block events from Core, returning
// a new block event channel on success.
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return nil, ErrClientNotRunning
}
if f.isListeningForBlocks.Load() {
return nil, fmt.Errorf("already subscribed to new blocks")
}
Expand Down Expand Up @@ -252,10 +222,6 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
// syncing, and false for already caught up. It can also return an error
// in the case of a failed status request.
func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) {
// return error if the client is still not started
if !f.client.IsRunning() {
return false, ErrClientNotRunning
}
resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{})
if err != nil {
return false, err
Expand Down
3 changes: 1 addition & 2 deletions core/fetcher_no_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func TestBlockFetcherHeaderValues(t *testing.T) {
node := StartTestNode(t)
host, port, err := net.SplitHostPort(node.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

Expand Down
4 changes: 1 addition & 3 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

// generate some blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
Expand Down
4 changes: 1 addition & 3 deletions core/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {

host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
client := newTestClient(t, host, port)
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)

sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
<-sub
Expand Down
19 changes: 19 additions & 0 deletions core/testing.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
tmrand "github.com/tendermint/tendermint/libs/rand"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"

"github.com/celestiaorg/celestia-app/v3/test/util/genesis"
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"
Expand Down Expand Up @@ -60,3 +66,16 @@ func generateRandomAccounts(n int) []string {
}
return accounts
}

func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn {
t.Helper()
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
endpoint := net.JoinHostPort(ip, port)
client, err := grpc.NewClient(endpoint, opt)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
t.Cleanup(cancel)
ready := client.WaitForStateChange(ctx, connectivity.Ready)
require.True(t, ready)
return client
}
112 changes: 109 additions & 3 deletions nodebuilder/core/constructors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,115 @@
package core

import (
"github.com/celestiaorg/celestia-node/core"
"context"
"crypto/tls"
"encoding/json"
"errors"
"net"
"os"
"path/filepath"

"go.uber.org/fx"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/celestiaorg/celestia-node/libs/utils"
)

func remote(cfg Config) *core.Client {
return core.NewClient(cfg.IP, cfg.Port)
const xtokenFileName = "xtoken.json"

func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) {
var opts []grpc.DialOption
if cfg.TLSEnabled {
opts = append(opts, grpc.WithTransportCredentials(
credentials.NewTLS(&tls.Config{MinVersion: tls.VersionTLS12})),
)
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
if cfg.XTokenPath != "" {
xToken, err := parseTokenPath(cfg.XTokenPath)
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken)))
opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken)))
}

endpoint := net.JoinHostPort(cfg.IP, cfg.Port)
conn, err := grpc.NewClient(endpoint, opts...)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
conn.Connect()
if !conn.WaitForStateChange(ctx, connectivity.Ready) {
return errors.New("couldn't connect to core endpoint")
}
return nil
},
OnStop: func(context.Context) error {
return conn.Close()
},
})
return conn, nil
}

func authInterceptor(xtoken string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken)
return invoker(ctx, method, req, reply, cc, opts...)
}
}

func authStreamInterceptor(xtoken string) grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
cc *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
ctx = metadata.AppendToOutgoingContext(ctx, "x-token", xtoken)
return streamer(ctx, desc, cc, method, opts...)
}
}

// parseTokenPath retrieves the authentication token from a JSON file at the specified path.
func parseTokenPath(xtokenPath string) (string, error) {
xtokenPath = filepath.Join(xtokenPath, xtokenFileName)
exist := utils.Exists(xtokenPath)
if !exist {
return "", os.ErrNotExist
}

token, err := os.ReadFile(xtokenPath)
if err != nil {
return "", err
}

auth := struct {
Token string `json:"x-token"`
}{}

err = json.Unmarshal(token, &auth)
if err != nil {
return "", err
}
if auth.Token == "" {
return "", errors.New("x-token is empty. Please setup a token or cleanup xtokenPath")
}
return auth.Token, nil
}
Loading

0 comments on commit 872d422

Please sign in to comment.