Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Do not propagate blocks if subscribed to blocks from incorrect chain #3086

Merged
merged 7 commits into from
Jan 17, 2024
Merged
4 changes: 3 additions & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

// generate 10 blocks
generateBlocks(t, fetcher)
Expand Down
20 changes: 17 additions & 3 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
var (
tracer = otel.Tracer("core/listener")
retrySubscriptionDelay = 5 * time.Second

errInvalidSubscription = errors.New("invalid subscription")
)

// Listener is responsible for listening to Core for
Expand All @@ -41,11 +43,12 @@ type Listener struct {
headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn

listenerTimeout time.Duration

metrics *listenerMetrics

cancel context.CancelFunc
chainID string

listenerTimeout time.Duration
cancel context.CancelFunc
}

func NewListener(
Expand Down Expand Up @@ -81,6 +84,7 @@ func NewListener(
store: store,
listenerTimeout: 5 * blocktime,
metrics: metrics,
chainID: p.chainID,
}, nil
}

Expand Down Expand Up @@ -117,6 +121,10 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat
// listener stopped because external context was canceled
return
}
if errors.Is(err, errInvalidSubscription) {
// stop node if there is a critical issue with the block subscription
log.Fatalf("listener: %v", err)
}

log.Warnw("listener: subscriber error, resubscribing...", "err", err)
sub = cl.resubscribe(ctx)
Expand Down Expand Up @@ -163,6 +171,12 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned
return errors.New("underlying subscription was closed")
}

if cl.chainID != "" && b.Header.ChainID != cl.chainID {
log.Errorf("listener: received block with unexpected chain ID: expected %s,"+
" received %s", cl.chainID, b.Header.ChainID)
return errInvalidSubscription
}

log.Debugw("listener: new block from core", "height", b.Header.Height)

err := cl.handleNewSignedBlock(ctx, b)
Expand Down
45 changes: 41 additions & 4 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ func TestListener(t *testing.T) {
t.Cleanup(subs.Cancel)

// create one block to store as Head in local store and then unsubscribe from block events
fetcher, _ := createCoreFetcher(t, DefaultTestConfig())
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)

eds := createEdsPubSub(ctx, t)

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t))
cl := createListener(ctx, t, fetcher, ps0, eds, createStore(t), networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -80,6 +84,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, cctx := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

Expand All @@ -92,7 +97,7 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store)
cl := createListener(ctx, t, fetcher, ps0, eds, store, networkID)
err = cl.Start(ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -124,6 +129,36 @@ func TestListenerWithNonEmptyBlocks(t *testing.T) {
require.Nil(t, cl.cancel)
}

func TestListenerWithWrongChainRPC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

// create mocknet with two pubsub endpoints
ps0, _ := createMocknetWithTwoPubsubEndpoints(ctx, t)

// create one block to store as Head in local store and then unsubscribe from block events
cfg := DefaultTestConfig()
cfg.ChainID = networkID
fetcher, _ := createCoreFetcher(t, cfg)
eds := createEdsPubSub(ctx, t)

store := createStore(t)
err := store.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
err = store.Stop(ctx)
require.NoError(t, err)
})

// create Listener and start listening
cl := createListener(ctx, t, fetcher, ps0, eds, store, "wrong-chain-rpc")
sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)

err = cl.listen(ctx, sub)
assert.ErrorIs(t, err, errInvalidSubscription)
}

func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) {
net, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
Expand Down Expand Up @@ -166,6 +201,7 @@ func createListener(
ps *pubsub.PubSub,
edsSub *shrexsub.PubSub,
store *eds.Store,
chainID string,
) *Listener {
p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID))
require.NoError(t, err)
Expand All @@ -180,7 +216,8 @@ func createListener(
require.NoError(t, p2pSub.Stop(ctx))
})

listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, store, nodep2p.BlockTime)
listener, err := NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader,
store, nodep2p.BlockTime, WithChainID(nodep2p.Network(chainID)))
require.NoError(t, err)
return listener
}
Expand Down
10 changes: 10 additions & 0 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package core

import "github.com/celestiaorg/celestia-node/nodebuilder/p2p"

type Option func(*params)

type params struct {
metrics bool

chainID string
}

// WithMetrics is a functional option that enables metrics
Expand All @@ -13,3 +17,9 @@ func WithMetrics() Option {
p.metrics = true
}
}

func WithChainID(id p2p.Network) Option {
return func(p *params) {
p.chainID = id.String()
}
}
4 changes: 3 additions & 1 deletion libs/utils/resetctx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import "context"
import (
"context"
)

// ResetContextOnError returns a fresh context if the given context has an error.
func ResetContextOnError(ctx context.Context) context.Context {
Expand Down
3 changes: 2 additions & 1 deletion nodebuilder/core/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
pubsub *shrexsub.PubSub,
construct header.ConstructFn,
store *eds.Store,
chainID p2p.Network,
) (*core.Listener, error) {
var opts []core.Option
opts := []core.Option{core.WithChainID(chainID)}
if MetricsEnabled {
opts = append(opts, core.WithMetrics())
}
Expand Down
Loading