Skip to content

Commit

Permalink
Merge branch 'main' into submit_multiple_blobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sontrinh16 authored Jan 29, 2024
2 parents 62c950b + 1bae060 commit 0f2175c
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 23 deletions.
15 changes: 10 additions & 5 deletions api/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@ import (
var log = logging.Logger("rpc")

type Server struct {
srv *http.Server
rpc *jsonrpc.RPCServer
listener net.Listener
srv *http.Server
rpc *jsonrpc.RPCServer
listener net.Listener
authDisabled bool

started atomic.Bool

auth jwt.Signer
}

func NewServer(address, port string, secret jwt.Signer) *Server {
func NewServer(address, port string, authDisabled bool, secret jwt.Signer) *Server {
rpc := jsonrpc.NewServer()
srv := &Server{
rpc: rpc,
Expand All @@ -38,7 +39,8 @@ func NewServer(address, port string, secret jwt.Signer) *Server {
// the amount of time allowed to read request headers. set to the default 2 seconds
ReadHeaderTimeout: 2 * time.Second,
},
auth: secret,
auth: secret,
authDisabled: authDisabled,
}
srv.srv.Handler = &auth.Handler{
Verify: srv.verifyAuth,
Expand All @@ -51,6 +53,9 @@ func NewServer(address, port string, secret jwt.Signer) *Server {
// reached if a token is provided in the header of the request, otherwise only
// methods with `read` permissions are accessible.
func (s *Server) verifyAuth(_ context.Context, token string) ([]auth.Permission, error) {
if s.authDisabled {
return perms.AllPerms, nil
}
return authtoken.ExtractSignedPermissions(s.auth, token)
}

Expand Down
2 changes: 1 addition & 1 deletion libs/pidstore/pidstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestPutLoad(t *testing.T) {

ds := sync.MutexWrap(datastore.NewMapDatastore())

t.Run("unitialized-pidstore", func(t *testing.T) {
t.Run("uninitialized-pidstore", func(t *testing.T) {
testPutLoad(ctx, ds, t)
})
t.Run("initialized-pidstore", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func ConstructModule(tp node.Type, network p2p.Network, cfg *Config, store Store
state.ConstructModule(tp, &cfg.State, &cfg.Core),
modhead.ConstructModule[*header.ExtendedHeader](tp, &cfg.Header),
share.ConstructModule(tp, &cfg.Share),
rpc.ConstructModule(tp, &cfg.RPC),
gateway.ConstructModule(tp, &cfg.Gateway),
core.ConstructModule(tp, &cfg.Core),
das.ConstructModule(tp, &cfg.DASer),
fraud.ConstructModule(tp),
blob.ConstructModule(),
node.ConstructModule(tp),
prune.ConstructModule(tp),
rpc.ConstructModule(tp, &cfg.RPC),
)

return fx.Module(
Expand Down
8 changes: 8 additions & 0 deletions nodebuilder/node/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ func (m *module) Info(context.Context) (Info, error) {
}, nil
}

func (m *module) Ready(context.Context) (bool, error) {
// Because the node uses FX to provide the RPC last, all services' lifecycles have been started by
// the point this endpoint is available. It is not 100% guaranteed at this point that all services
// are fully ready, but it is very high probability and all endpoints are available at this point
// regardless.
return true, nil
}

func (m *module) LogLevelSet(_ context.Context, name, level string) error {
return logging.SetLogLevel(name, level)
}
Expand Down
8 changes: 8 additions & 0 deletions nodebuilder/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ type Module interface {
// Info returns administrative information about the node.
Info(context.Context) (Info, error)

// Ready returns true once the node's RPC is ready to accept requests.
Ready(context.Context) (bool, error)

// LogLevelSet sets the given component log level to the given level.
LogLevelSet(ctx context.Context, name, level string) error

Expand All @@ -28,6 +31,7 @@ var _ Module = (*API)(nil)
type API struct {
Internal struct {
Info func(context.Context) (Info, error) `perm:"admin"`
Ready func(context.Context) (bool, error) `perm:"read"`
LogLevelSet func(ctx context.Context, name, level string) error `perm:"admin"`
AuthVerify func(ctx context.Context, token string) ([]auth.Permission, error) `perm:"admin"`
AuthNew func(ctx context.Context, perms []auth.Permission) (string, error) `perm:"admin"`
Expand All @@ -38,6 +42,10 @@ func (api *API) Info(ctx context.Context) (Info, error) {
return api.Internal.Info(ctx)
}

func (api *API) Ready(ctx context.Context) (bool, error) {
return api.Internal.Ready(ctx)
}

func (api *API) LogLevelSet(ctx context.Context, name, level string) error {
return api.Internal.LogLevelSet(ctx, name, level)
}
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/p2p/cmd/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ var connectednessCmd = &cobra.Command{

var natStatusCmd = &cobra.Command{
Use: "nat-status",
Short: "Gets the currrent NAT status",
Short: "Gets the current NAT status",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
client, err := cmdnode.ParseClientFromCtx(cmd.Context())
Expand Down
8 changes: 5 additions & 3 deletions nodebuilder/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (
)

type Config struct {
Address string
Port string
Address string
Port string
SkipAuth bool
}

func DefaultConfig() Config {
return Config{
Address: defaultBindAddress,
// do NOT expose the same port as celestia-core by default so that both can run on the same machine
Port: defaultPort,
Port: defaultPort,
SkipAuth: false,
}
}

Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/rpc/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,5 @@ func registerEndpoints(
}

func server(cfg *Config, auth jwt.Signer) *rpc.Server {
return rpc.NewServer(cfg.Address, cfg.Port, auth)
return rpc.NewServer(cfg.Address, cfg.Port, cfg.SkipAuth, auth)
}
16 changes: 16 additions & 0 deletions nodebuilder/rpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package rpc
import (
"fmt"

logging "github.com/ipfs/go-log/v2"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
)

var (
log = logging.Logger("rpc")
addrFlag = "rpc.addr"
portFlag = "rpc.port"
authFlag = "rpc.skip-auth"
)

// Flags gives a set of hardcoded node/rpc package flags.
Expand All @@ -26,6 +29,11 @@ func Flags() *flag.FlagSet {
"",
fmt.Sprintf("Set a custom RPC port (default: %s)", defaultPort),
)
flags.Bool(
authFlag,
false,
"Skips authentication for RPC requests",
)

return flags
}
Expand All @@ -40,4 +48,12 @@ func ParseFlags(cmd *cobra.Command, cfg *Config) {
if port != "" {
cfg.Port = port
}
ok, err := cmd.Flags().GetBool(authFlag)
if err != nil {
panic(err)
}
if ok {
log.Warn("RPC authentication is disabled")
cfg.SkipAuth = true
}
}
6 changes: 5 additions & 1 deletion nodebuilder/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func TestNodeModule(t *testing.T) {
require.NoError(t, err)
require.Equal(t, info.APIVersion, node.APIVersion)

ready, err := client.Node.Ready(ctx)
require.NoError(t, err)
require.True(t, ready)

perms, err := client.Node.AuthVerify(ctx, jwt)
require.NoError(t, err)
require.Equal(t, perms, adminPerms)
Expand Down Expand Up @@ -89,7 +93,7 @@ func TestGetByHeight(t *testing.T) {
require.ErrorContains(t, err, "given height is from the future")
}

// TestBlobRPC ensures that blobs can be submited via rpc
// TestBlobRPC ensures that blobs can be submitted via rpc
func TestBlobRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
t.Cleanup(cancel)
Expand Down
4 changes: 2 additions & 2 deletions nodebuilder/tests/swamp/swamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

var blackholeIP6 = net.ParseIP("100::")

// DefaultTestTimeout should be used as the default timout on all the Swamp tests.
// DefaultTestTimeout should be used as the default timeout on all the Swamp tests.
// It's generously set to 5 minutes to give enough time for CI.
const DefaultTestTimeout = time.Minute * 5

Expand Down Expand Up @@ -331,7 +331,7 @@ func (s *Swamp) Disconnect(t *testing.T, peerA, peerB *nodebuilder.Node) {
// SetBootstrapper sets the given bootstrappers as the "bootstrappers" for the
// Swamp test suite. Every new full or light node created on the suite afterwards
// will automatically add the suite's bootstrappers as trusted peers to their config.
// NOTE: Bridge nodes do not automaatically add the bootstrappers as trusted peers.
// NOTE: Bridge nodes do not automatically add the bootstrappers as trusted peers.
// NOTE: Use `NewNodeWithStore` to avoid this automatic configuration.
func (s *Swamp) SetBootstrapper(t *testing.T, bootstrappers ...*nodebuilder.Node) {
for _, trusted := range bootstrappers {
Expand Down
4 changes: 2 additions & 2 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func TestShrexGetter(t *testing.T) {
t.Cleanup(cancel)

// generate test data
eds, dah, maxNamesapce := generateTestEDS(t)
eds, dah, maxNamespace := generateTestEDS(t)
eh := headertest.RandExtendedHeaderWithRoot(t, dah)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

namespace, err := addToNamespace(maxNamesapce, 1)
namespace, err := addToNamespace(maxNamespace, 1)
require.NoError(t, err)
// check for namespace to be not in root
require.Len(t, ipld.FilterRootByNamespace(dah, namespace), 0)
Expand Down
8 changes: 4 additions & 4 deletions share/ipld/corrupted_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) {
t.Cleanup(cancel)
net := availability_test.NewTestDAGNet(ctx, t)

requestor := full.Node(net)
requester := full.Node(net)
provider, mockBS := availability_test.MockNode(t, net)
provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService))
net.ConnectAll()
Expand All @@ -37,15 +37,15 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) {
eh := headertest.RandExtendedHeaderWithRoot(t, root)
getCtx, cancelGet := context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err := requestor.SharesAvailable(getCtx, eh)
err := requester.SharesAvailable(getCtx, eh)
require.NoError(t, err)

// clear the storage of the requester so that it must retrieve again, then start attacking
// we reinitialize the node to clear the eds store
requestor = full.Node(net)
requester = full.Node(net)
mockBS.Attacking = true
getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout)
t.Cleanup(cancelGet)
err = requestor.SharesAvailable(getCtx, eh)
err = requester.SharesAvailable(getCtx, eh)
require.ErrorIs(t, err, share.ErrNotAvailable)
}
2 changes: 1 addition & 1 deletion share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
log.Debugw("stored initial height", "height", h.Height())
}

// update storeFrom if header heigh
// update storeFrom if header height
m.storeFrom.Store(uint64(max(0, int(h.Height())-storedPoolsAmount)))
log.Debugw("updated lowest stored height", "height", h.Height())
}
Expand Down
2 changes: 1 addition & 1 deletion state/core_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (ca *CoreAccessor) constructSignedTx(
}

// SubmitPayForBlob builds, signs, and synchronously submits a MsgPayForBlob. It blocks until the
// transaction is committed and returns the TxReponse. If gasLim is set to 0, the method will
// transaction is committed and returns the TxResponse. If gasLim is set to 0, the method will
// automatically estimate the gas limit. If the fee is negative, the method will use the nodes min
// gas price multiplied by the gas limit.
func (ca *CoreAccessor) SubmitPayForBlob(
Expand Down

0 comments on commit 0f2175c

Please sign in to comment.