Skip to content

Commit

Permalink
Merge branch 'main' into chore/ramin/rename-cmd-parse-client
Browse files Browse the repository at this point in the history
  • Loading branch information
ramin authored Jan 18, 2024
2 parents c4637a1 + 36205cc commit 33fe9f6
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 124 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/go-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ jobs:
needs: [lint, go_mod_tidy_check]
name: Run Unit Tests with Race Detector
runs-on: ubuntu-latest
continue-on-error: true

steps:
- uses: actions/checkout@v4
Expand All @@ -124,6 +125,7 @@ jobs:
needs: [lint, go_mod_tidy_check]
name: Run Integration Tests
runs-on: ubuntu-latest
continue-on-error: true

steps:
- uses: actions/checkout@v4
Expand Down
20 changes: 0 additions & 20 deletions api/gateway/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,15 @@ package gateway

import (
"context"
"errors"
"net/http"
"time"

"github.com/gorilla/mux"

"github.com/celestiaorg/celestia-node/nodebuilder/state"
)

const timeout = time.Minute

func (h *Handler) RegisterMiddleware(srv *Server) {
srv.RegisterMiddleware(
setContentType,
checkPostDisabled(h.state),
wrapRequestContext,
enableCors,
)
Expand All @@ -36,20 +30,6 @@ func setContentType(next http.Handler) http.Handler {
})
}

// checkPostDisabled ensures that context was canceled and prohibit POST requests.
func checkPostDisabled(state state.Module) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// check if state service was halted and deny the transaction
if r.Method == http.MethodPost && state.IsStopped(r.Context()) {
writeError(w, http.StatusMethodNotAllowed, r.URL.Path, errors.New("not possible to submit data"))
return
}
next.ServeHTTP(w, r)
})
}
}

// wrapRequestContext ensures we implement a deadline on serving requests
// via the gateway server-side to prevent context leaks.
func wrapRequestContext(next http.Handler) http.Handler {
Expand Down
4 changes: 3 additions & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
Expand Down
20 changes: 17 additions & 3 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
var (
tracer = otel.Tracer("core/listener")
retrySubscriptionDelay = 5 * time.Second

errInvalidSubscription = errors.New("invalid subscription")
)

// Listener is responsible for listening to Core for
Expand All @@ -41,11 +43,12 @@ type Listener struct {
headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn

listenerTimeout time.Duration

metrics *listenerMetrics

cancel context.CancelFunc
chainID string

listenerTimeout time.Duration
cancel context.CancelFunc
}

func NewListener(
Expand Down Expand Up @@ -81,6 +84,7 @@ func NewListener(
store: store,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
}, nil
}

Expand Down Expand Up @@ -117,6 +121,10 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat
// listener stopped because external context was canceled
return
}
if errors.Is(err, errInvalidSubscription) {
// stop node if there is a critical issue with the block subscription
log.Fatalf("listener: %v", err)
}

log.Warnw("listener: subscriber error, resubscribing...", "err", err)
sub = cl.resubscribe(ctx)
Expand Down Expand Up @@ -163,6 +171,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned
return errors.New("underlying subscription was closed")
}

if cl.chainID != "" && b.Header.ChainID != cl.chainID {
log.Errorf("listener: received block with unexpected chain ID: expected %s,"+
" received %s", cl.chainID, b.Header.ChainID)
return errInvalidSubscription
}

log.Debugw("listener: new block from core", "height", b.Header.Height)

err := cl.handleNewSignedBlock(ctx, b)
Expand Down
45 changes: 41 additions & 4 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ func TestListener(t *testing.T) {
t.Cleanup(subs.Cancel)

// create one block to store as Head in local store and then unsubscribe from block events
fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

eds := createEdsPubSub(ctx, t)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t))
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -80,6 +84,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

Expand All @@ -92,7 +97,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store)
cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -124,6 +129,36 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
require.Nil(t, cl.cancel)
}

func TestListenerWithWrongChainRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

// create mocknet with two pubsub endpoints
ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t)

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)
err := store.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc")
sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)

err = cl.listen(ctx, sub)
assert.ErrorIs(t, err, errInvalidSubscription)
}

func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
Expand Down Expand Up @@ -166,6 +201,7 @@ func createListener(
ps *pubsub.PubSub,
edsSub *shrexsub.PubSub,
store *eds.Store,
chainID string,
) *Listener {
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID))
require.NoError(t, err)
Expand All @@ -180,7 +216,8 @@ func createListener(
require.NoError(t, p2pSub.Stop(ctx))
})

listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, store, nodep2p.BlockTime)
listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader,
store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID)))
require.NoError(t, err)
return listener
}
Expand Down
10 changes: 10 additions & 0 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package core

import "github.com/celestiaorg/celestia-node/nodebuilder/p2p"

type Option func(*params)

type params struct {
metrics bool

chainID string
}

// WithMetrics is a functional option that enables metrics
Expand All @@ -13,3 +17,9 @@ func WithMetrics() Option {
p.metrics = true
}
}

func WithChainID(id p2p.Network) Option {
return func(p *params) {
p.chainID = id.String()
}
}
Loading

0 comments on commit 33fe9f6

Please sign in to comment.