Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
adlerjohn authored Feb 11, 2025
2 parents 8416d1d + d571074 commit 3c672df
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 10 deletions.
10 changes: 10 additions & 0 deletions core/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"github.com/tendermint/tendermint/types"
"google.golang.org/grpc"
"google.golang.org/grpc/status"

libhead "github.com/celestiaorg/go-header"
)
Expand Down Expand Up @@ -173,9 +174,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types

subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{})
if err != nil {
close(f.doneCh)
f.isListeningForBlocks.Store(false)
return nil, err
}

log.Debug("created a subscription. Start listening for new blocks...")
signedBlockCh := make(chan types.EventDataSignedBlock)
go func() {
defer close(f.doneCh)
Expand All @@ -189,6 +193,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
resp, err := subscription.Recv()
if err != nil {
log.Errorw("fetcher: error receiving new height", "err", err.Error())
_, ok := status.FromError(err) // parsing the gRPC error
if ok {
// ok means that err contains a gRPC status error.
// move on another round of resubscribing.
return
}
continue
}
withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second)
Expand Down
130 changes: 130 additions & 0 deletions core/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package core

import (
"context"
"errors"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"google.golang.org/grpc"
)

func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
Expand Down Expand Up @@ -40,3 +43,130 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) {
}
require.NoError(t, fetcher.Stop(ctx))
}

type mockAPIService struct {
coregrpc.UnimplementedBlockAPIServer

grpcServer *grpc.Server
fetcher *BlockFetcher
}

func (m *mockAPIService) SubscribeNewHeights(
_ *coregrpc.SubscribeNewHeightsRequest,
srv coregrpc.BlockAPI_SubscribeNewHeightsServer,
) error {
for i := 0; i < 20; i++ {
b, err := m.fetcher.GetBlock(context.Background(), int64(i))
if err != nil {
return err
}
err = srv.Send(&coregrpc.NewHeightEvent{Height: b.Header.Height, Hash: b.Header.Hash()})
if err != nil {
return err
}
time.Sleep(time.Second)
}
return nil
}

func (m *mockAPIService) BlockByHeight(
req *coregrpc.BlockByHeightRequest,
srv coregrpc.BlockAPI_BlockByHeightServer,
) error {
b, err := m.fetcher.client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: req.Height})
if err != nil {
return err
}
data, err := b.Recv()
if err != nil {
return err
}
err = srv.Send(data)
if err != nil {
return err
}
return nil
}

func (m *mockAPIService) Start() error {
listener, err := net.Listen("tcp", ":50051")
if err != nil {
return err
}

grpcServer := grpc.NewServer()
m.grpcServer = grpcServer
coregrpc.RegisterBlockAPIServer(grpcServer, m)
go func() {
err = grpcServer.Serve(listener)
if err != nil && !errors.Is(err, grpc.ErrServerStopped) {
panic(err)
}
}()
return nil
}

func (m *mockAPIService) Stop() error {
m.grpcServer.Stop()
return nil
}

func (m *mockAPIService) generateBlocksWithHeights(ctx context.Context, t *testing.T) {
cfg := DefaultTestConfig()
fetcher, cctx := createCoreFetcher(t, cfg)
m.fetcher = fetcher
generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)
require.NoError(t, fetcher.Stop(ctx))
}

// TestStart_SubscribeNewBlockEvent_Resubscription ensures that subscription will not stuck in case
// gRPC server was stopped.
func TestStart_SubscribeNewBlockEvent_Resubscription(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
m := &mockAPIService{}
m.generateBlocksWithHeights(ctx, t)

require.NoError(t, m.Start())

client := newTestClient(t, "localhost", "50051")

fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
// subscribe to block event to get blocks
newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)

select {
case newBlockFromChan := <-newBlockChan:
h := newBlockFromChan.Header.Height
_, err := fetcher.GetSignedBlock(ctx, h)
require.NoError(t, err)
case <-ctx.Done():
require.NoError(t, ctx.Err())
}

require.NoError(t, m.Stop())

// stopping the server sends an error with the status code
// to client, so the subscription loop will be finished.
// check that newBlockChan was closed
_, ok := <-newBlockChan
require.False(t, ok)

// start server and try to get a new subscription
require.NoError(t, m.Start())
newBlockChan, err = fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
select {
case newBlockFromChan := <-newBlockChan:
h := newBlockFromChan.Header.Height
_, err := fetcher.GetSignedBlock(ctx, h)
require.NoError(t, err)
case <-ctx.Done():
require.NoError(t, ctx.Err())
}
require.NoError(t, m.Stop())
require.NoError(t, m.fetcher.Stop(ctx))
require.NoError(t, fetcher.Stop(ctx))
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
module github.com/celestiaorg/celestia-node

go 1.23.2
go 1.23.5

require (
cosmossdk.io/math v1.4.0
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app/v3 v3.3.0-arabica
github.com/celestiaorg/celestia-app/v3 v3.3.1
github.com/celestiaorg/go-fraud v0.2.1
github.com/celestiaorg/go-header v0.6.4
github.com/celestiaorg/go-libp2p-messenger v0.2.0
github.com/celestiaorg/go-square/merkle v0.0.0-20240117232118-fd78256df076
github.com/celestiaorg/go-square/v2 v2.1.0
github.com/celestiaorg/nmt v0.22.2
github.com/celestiaorg/nmt v0.23.0
github.com/celestiaorg/rsmt2d v0.14.0
github.com/cosmos/cosmos-sdk v0.46.16
github.com/cristalhq/jwt/v5 v5.4.0
Expand Down Expand Up @@ -357,7 +357,7 @@ replace (
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1
// broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app
github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35
github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35
)

replace github.com/ipfs/boxo => github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI
github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ=
github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84ZUbO/6/8XlODkFsj/ln85AE0F6pj8=
github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc=
github.com/celestiaorg/celestia-app/v3 v3.3.0-arabica h1:MQHm5NvWt5cQl5YHpTHvuGb55/1uWQg3SNypkVt5rhU=
github.com/celestiaorg/celestia-app/v3 v3.3.0-arabica/go.mod h1:MKhiQgATDdLouzC5KvXDAnDpEgIXyD0MNiq0ChrWFco=
github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35 h1:T21AhezjcByAlWDHmiVbpg743Uqk/dqBzJkQsAnbQf8=
github.com/celestiaorg/celestia-core v1.45.0-tm-v0.34.35/go.mod h1:fQ46s1hYFTGFBsHsuGsbxDZ720ZPQow5Iyqw+yErZSo=
github.com/celestiaorg/celestia-app/v3 v3.3.1 h1:e0iSWbf84mMOGU3aVCDd+I7a7wUQLXurHXhcmG6lyQI=
github.com/celestiaorg/celestia-app/v3 v3.3.1/go.mod h1:FSv7/cIGoZIzcQIQPxTYYDeCO78A4VmC20jxf3Oqn4Y=
github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35 h1:K0kSVRlKfsPwfiA4o8GNUNPfZ+wF1MnYajom4CzJxpQ=
github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35/go.mod h1:FSd32MUffdVUYIXW+m/1v5pHptRQF2RJC88fwsgrKG8=
github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 h1:qxWiGrDEcg4FzVTpIXU/v3wjP7q1Lz4AMhSBBRABInU=
github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16/go.mod h1:W30mNt3+2l516HVR8Gt9+Gf4qOrWC9/x18MTEx2GljE=
github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w=
Expand All @@ -367,8 +367,8 @@ github.com/celestiaorg/go-square/v2 v2.1.0 h1:ECIvYEeHIWiIJGDCJxQNtzqm5DmnBly7XG
github.com/celestiaorg/go-square/v2 v2.1.0/go.mod h1:n3ztrh8CBjWOD6iWYMo3pPOlQIgzLK9yrnqMPcNo6g8=
github.com/celestiaorg/merkletree v0.0.0-20230308153949-c33506a7aa26 h1:P2RI1xJ49EZ8cuHMcH+ZSBonfRDtBS8OS9Jdt1BWX3k=
github.com/celestiaorg/merkletree v0.0.0-20230308153949-c33506a7aa26/go.mod h1:2m8ukndOegwB0PU0AfJCwDUQHqd7QQRlSXvQL5VToVY=
github.com/celestiaorg/nmt v0.22.2 h1:JmOMtZL9zWAed1hiwb9DDs+ELcKp/ZQZ3rPverge/V8=
github.com/celestiaorg/nmt v0.22.2/go.mod h1:/7huDiSRL/d2EGhoiKctgSzmLOJoWG8yEfbFtY1+Mow=
github.com/celestiaorg/nmt v0.23.0 h1:cfYy//hL1HeDSH0ub3CPlJuox5U5xzgg4JGZrw23I/I=
github.com/celestiaorg/nmt v0.23.0/go.mod h1:kYfIjRq5rmA2mJnv41GLWkxn5KyLNPlma3v5Q68rHdI=
github.com/celestiaorg/rsmt2d v0.14.0 h1:L7XJ3tRJDY8sQcvCjzHq0L7JmsmaSD+VItymIYFLqYc=
github.com/celestiaorg/rsmt2d v0.14.0/go.mod h1:4kxqiTdFev49sGiKXTDjohbWYOG5GlcIfftTgaBJnpc=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,13 @@ func TestConvertFromArchivalToPruned(t *testing.T) {
FailedHeaders map[uint64]struct{} `json:"failed"`
}

host, port, err := net.SplitHostPort(sw.ClientContext.GRPCClient.Target())
require.NoError(t, err)

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)
archivalCfg.Core.IP = host
archivalCfg.Core.Port = port

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
Expand Down

0 comments on commit 3c672df

Please sign in to comment.