From 482f452be6568f3112666178a06a92c517928fea Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 7 Feb 2025 17:31:01 +0200 Subject: [PATCH 1/8] fix(test/swamp): fix TestConvertFromArchivalToPruned (#4097) --- nodebuilder/tests/prune_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index 308949f941..d3f6af08c6 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -245,8 +245,13 @@ func TestConvertFromArchivalToPruned(t *testing.T) { FailedHeaders map[uint64]struct{} `json:"failed"` } + host, port, err := net.SplitHostPort(sw.ClientContext.GRPCClient.Target()) + require.NoError(t, err) + for _, nt := range []node.Type{node.Bridge, node.Full} { archivalCfg := nodebuilder.DefaultConfig(nt) + archivalCfg.Core.IP = host + archivalCfg.Core.Port = port store := nodebuilder.MockStore(t, archivalCfg) ds, err := store.Datastore() From b2e896772bc43a43dfd15045db505e0cf440ee0d Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 7 Feb 2025 18:22:20 +0200 Subject: [PATCH 2/8] fix(core/fetcher): resubscribe if consensus node goes offline (#4096) --- core/fetcher.go | 10 ++++ core/fetcher_test.go | 130 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/core/fetcher.go b/core/fetcher.go index 6f049cc7ac..e47b913d96 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -13,6 +13,7 @@ import ( coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" "google.golang.org/grpc" + "google.golang.org/grpc/status" libhead "github.com/celestiaorg/go-header" ) @@ -173,9 +174,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { + close(f.doneCh) + f.isListeningForBlocks.Store(false) return nil, err } + log.Debug("created a subscription. Start listening for new blocks...") signedBlockCh := make(chan types.EventDataSignedBlock) go func() { defer close(f.doneCh) @@ -189,6 +193,12 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types resp, err := subscription.Recv() if err != nil { log.Errorw("fetcher: error receiving new height", "err", err.Error()) + _, ok := status.FromError(err) // parsing the gRPC error + if ok { + // ok means that err contains a gRPC status error. + // move on another round of resubscribing. + return + } continue } withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 8d7659494d..0f2c4dc93c 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -2,12 +2,15 @@ package core import ( "context" + "errors" "net" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" + "google.golang.org/grpc" ) func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { @@ -40,3 +43,130 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { } require.NoError(t, fetcher.Stop(ctx)) } + +type mockAPIService struct { + coregrpc.UnimplementedBlockAPIServer + + grpcServer *grpc.Server + fetcher *BlockFetcher +} + +func (m *mockAPIService) SubscribeNewHeights( + _ *coregrpc.SubscribeNewHeightsRequest, + srv coregrpc.BlockAPI_SubscribeNewHeightsServer, +) error { + for i := 0; i < 20; i++ { + b, err := m.fetcher.GetBlock(context.Background(), int64(i)) + if err != nil { + return err + } + err = srv.Send(&coregrpc.NewHeightEvent{Height: b.Header.Height, Hash: b.Header.Hash()}) + if err != nil { + return err + } + time.Sleep(time.Second) + } + return nil +} + +func (m *mockAPIService) BlockByHeight( + req *coregrpc.BlockByHeightRequest, + srv coregrpc.BlockAPI_BlockByHeightServer, +) error { + b, err := m.fetcher.client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: req.Height}) + if err != nil { + return err + } + data, err := b.Recv() + if err != nil { + return err + } + err = srv.Send(data) + if err != nil { + return err + } + return nil +} + +func (m *mockAPIService) Start() error { + listener, err := net.Listen("tcp", ":50051") + if err != nil { + return err + } + + grpcServer := grpc.NewServer() + m.grpcServer = grpcServer + coregrpc.RegisterBlockAPIServer(grpcServer, m) + go func() { + err = grpcServer.Serve(listener) + if err != nil && !errors.Is(err, grpc.ErrServerStopped) { + panic(err) + } + }() + return nil +} + +func (m *mockAPIService) Stop() error { + m.grpcServer.Stop() + return nil +} + +func (m *mockAPIService) generateBlocksWithHeights(ctx context.Context, t *testing.T) { + cfg := DefaultTestConfig() + fetcher, cctx := createCoreFetcher(t, cfg) + m.fetcher = fetcher + generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) + require.NoError(t, fetcher.Stop(ctx)) +} + +// TestStart_SubscribeNewBlockEvent_Resubscription ensures that subscription will not stuck in case +// gRPC server was stopped. +func TestStart_SubscribeNewBlockEvent_Resubscription(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + t.Cleanup(cancel) + m := &mockAPIService{} + m.generateBlocksWithHeights(ctx, t) + + require.NoError(t, m.Start()) + + client := newTestClient(t, "localhost", "50051") + + fetcher, err := NewBlockFetcher(client) + require.NoError(t, err) + // subscribe to block event to get blocks + newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) + require.NoError(t, err) + + select { + case newBlockFromChan := <-newBlockChan: + h := newBlockFromChan.Header.Height + _, err := fetcher.GetSignedBlock(ctx, h) + require.NoError(t, err) + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + + require.NoError(t, m.Stop()) + + // stopping the server sends an error with the status code + // to client, so the subscription loop will be finished. + // check that newBlockChan was closed + _, ok := <-newBlockChan + require.False(t, ok) + + // start server and try to get a new subscription + require.NoError(t, m.Start()) + newBlockChan, err = fetcher.SubscribeNewBlockEvent(ctx) + require.NoError(t, err) + select { + case newBlockFromChan := <-newBlockChan: + h := newBlockFromChan.Header.Height + _, err := fetcher.GetSignedBlock(ctx, h) + require.NoError(t, err) + case <-ctx.Done(): + require.NoError(t, ctx.Err()) + } + require.NoError(t, m.Stop()) + require.NoError(t, m.fetcher.Stop(ctx)) + require.NoError(t, fetcher.Stop(ctx)) +} From 4bdd1db6cc26b3b8022c199ba8e39f6483cba8e9 Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 7 Feb 2025 17:45:29 +0100 Subject: [PATCH 3/8] deps(go.mod): mocha bump for app (#4095) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ccb48fccc1..65821285c3 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app/v3 v3.3.1-arabica + github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha github.com/celestiaorg/go-fraud v0.2.1 github.com/celestiaorg/go-header v0.6.4 github.com/celestiaorg/go-libp2p-messenger v0.2.0 diff --git a/go.sum b/go.sum index 4fad4f188d..e1d9d4282f 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84ZUbO/6/8XlODkFsj/ln85AE0F6pj8= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc= -github.com/celestiaorg/celestia-app/v3 v3.3.1-arabica h1:PDXzoF/jK8JMGFGYNhqYiqWIKyivPaWDNwyOnVx1So0= -github.com/celestiaorg/celestia-app/v3 v3.3.1-arabica/go.mod h1:FSv7/cIGoZIzcQIQPxTYYDeCO78A4VmC20jxf3Oqn4Y= +github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha h1:yd27xj+Mqc+dqiXI8GArpkdJo5Di3J9+BINl1tMcuzQ= +github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha/go.mod h1:FSv7/cIGoZIzcQIQPxTYYDeCO78A4VmC20jxf3Oqn4Y= github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35 h1:K0kSVRlKfsPwfiA4o8GNUNPfZ+wF1MnYajom4CzJxpQ= github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35/go.mod h1:FSd32MUffdVUYIXW+m/1v5pHptRQF2RJC88fwsgrKG8= github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 h1:qxWiGrDEcg4FzVTpIXU/v3wjP7q1Lz4AMhSBBRABInU= From d57107415355ef6fee3fc01ba12cd3e48d7a310c Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Fri, 7 Feb 2025 17:57:39 +0100 Subject: [PATCH 4/8] deps(go.mod): bump app (#4099) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 65821285c3..a365488ed4 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b github.com/benbjohnson/clock v1.3.5 - github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha + github.com/celestiaorg/celestia-app/v3 v3.3.1 github.com/celestiaorg/go-fraud v0.2.1 github.com/celestiaorg/go-header v0.6.4 github.com/celestiaorg/go-libp2p-messenger v0.2.0 diff --git a/go.sum b/go.sum index e1d9d4282f..d8c2c7becd 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b h1:M9X7s1WJ/7Ju84ZUbO/6/8XlODkFsj/ln85AE0F6pj8= github.com/celestiaorg/boxo v0.0.0-20241118122411-70a650316c3b/go.mod h1:OpUrJtGmZZktUqJvPOtmP8wSfEFcdF/55d3PNCcYLwc= -github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha h1:yd27xj+Mqc+dqiXI8GArpkdJo5Di3J9+BINl1tMcuzQ= -github.com/celestiaorg/celestia-app/v3 v3.3.1-mocha/go.mod h1:FSv7/cIGoZIzcQIQPxTYYDeCO78A4VmC20jxf3Oqn4Y= +github.com/celestiaorg/celestia-app/v3 v3.3.1 h1:e0iSWbf84mMOGU3aVCDd+I7a7wUQLXurHXhcmG6lyQI= +github.com/celestiaorg/celestia-app/v3 v3.3.1/go.mod h1:FSv7/cIGoZIzcQIQPxTYYDeCO78A4VmC20jxf3Oqn4Y= github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35 h1:K0kSVRlKfsPwfiA4o8GNUNPfZ+wF1MnYajom4CzJxpQ= github.com/celestiaorg/celestia-core v1.47.0-tm-v0.34.35/go.mod h1:FSd32MUffdVUYIXW+m/1v5pHptRQF2RJC88fwsgrKG8= github.com/celestiaorg/cosmos-sdk v1.27.0-sdk-v0.46.16 h1:qxWiGrDEcg4FzVTpIXU/v3wjP7q1Lz4AMhSBBRABInU= From 1acbb24f9d45f60b3924772c2243d4582d898374 Mon Sep 17 00:00:00 2001 From: futreall <86553580+futreall@users.noreply.github.com> Date: Tue, 11 Feb 2025 17:59:40 +0200 Subject: [PATCH 5/8] docs: fix broken link (#4011) Co-authored-by: Oleg Kovalov --- docs/adr/adr-010-incentivized-testnet-monitoring.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/adr/adr-010-incentivized-testnet-monitoring.md b/docs/adr/adr-010-incentivized-testnet-monitoring.md index 1e82e81c1f..3d3e2b0b50 100644 --- a/docs/adr/adr-010-incentivized-testnet-monitoring.md +++ b/docs/adr/adr-010-incentivized-testnet-monitoring.md @@ -82,7 +82,7 @@ Node operators have the option of adding an additional exporter to their OTEL Co
How to monitor celestia-node with Grafana Cloud -1. [Install celestia-node](https://docs.celestia.org/developers/celestia-node) +1. [Install celestia-node](https://docs.celestia.org/how-to-guides/celestia-node#installation-options) 2. Sign up for an account on [Grafana](https://grafana.com/) 3. [Install OTEL Collector](https://opentelemetry.io/docs/collector/getting-started/) on the same machine as celestia-node. If on a Linux machine follow [these steps](https://opentelemetry.io/docs/collector/getting-started/#linux-packaging=). OTEL Collector should start automatically immediately after installation. 4. Configure OTEL Collector to receive metrics from celestia-node by confirming your `/etc/otelcol/config.yaml` has the default config: @@ -127,7 +127,7 @@ Node operators have the option of adding an additional exporter to their OTEL Co
How to monitor celestia-node with Uptrace -1. [Install celestia-node](https://docs.celestia.org/developers/celestia-node). +1. [Install celestia-node](https://docs.celestia.org/how-to-guides/celestia-node#installation-options). 2. Create an account on [Uptrace](https://app.uptrace.dev/). 3. Create a project on Uptrace. 4. Follow [these steps](https://uptrace.dev/opentelemetry/collector.html#when-to-use-opentelemetry-collector=) to install OTEL Collector Contrib on the same host as celestia-node. From 6bd26708cf074cf98c8bece696922c937290c57b Mon Sep 17 00:00:00 2001 From: Josh Stein <46639943+jcstein@users.noreply.github.com> Date: Wed, 12 Feb 2025 03:42:44 -0500 Subject: [PATCH 6/8] fix: cli with malformed height (#4107) --- nodebuilder/blob/cmd/blob.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/nodebuilder/blob/cmd/blob.go b/nodebuilder/blob/cmd/blob.go index 17eec5bbf1..5541b7c8df 100644 --- a/nodebuilder/blob/cmd/blob.go +++ b/nodebuilder/blob/cmd/blob.go @@ -43,12 +43,12 @@ var getCmd = &cobra.Command{ Short: "Returns the blob for the given namespace by commitment at a particular height.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[0], "0x") { - args[0] = "0x" + args[0] - } if !strings.HasPrefix(args[1], "0x") { args[1] = "0x" + args[1] } + if !strings.HasPrefix(args[2], "0x") { + args[2] = "0x" + args[2] + } return nil }, RunE: func(cmd *cobra.Command, args []string) error { @@ -84,9 +84,6 @@ var getAllCmd = &cobra.Command{ Short: "Returns all blobs for the given namespace at a particular height.\n" + "Note:\n* Namespace input parameter is expected to be in its hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[0], "0x") { - args[0] = "0x" + args[0] - } if !strings.HasPrefix(args[1], "0x") { args[1] = "0x" + args[1] } @@ -150,7 +147,7 @@ var submitCmd = &cobra.Command{ "returns the header height in which the blob(s) was/were include + the respective commitment(s).\n" + "User can use namespace and blobData as argument for single blob submission \n" + "or use --input-file flag with the path to a json file for multiple blobs submission, \n" + - `where the json file contains: + `where the json file contains: { "Blobs": [ @@ -241,12 +238,12 @@ var getProofCmd = &cobra.Command{ Short: "Retrieves the blob in the given namespaces at the given height by commitment and returns its Proof.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[0], "0x") { - args[0] = "0x" + args[0] - } if !strings.HasPrefix(args[1], "0x") { args[1] = "0x" + args[1] } + if !strings.HasPrefix(args[2], "0x") { + args[2] = "0x" + args[2] + } return nil }, RunE: func(cmd *cobra.Command, args []string) error { From 5185fc8e0a9cf8f538e0f7d3b5c2367fdcd74aff Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Thu, 13 Feb 2025 11:17:39 +0100 Subject: [PATCH 7/8] fix(docgen): add network.Direction (#4115) --- api/docgen/examples.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/docgen/examples.go b/api/docgen/examples.go index 572d5c3f04..332194e14d 100644 --- a/api/docgen/examples.go +++ b/api/docgen/examples.go @@ -183,6 +183,8 @@ func init() { state.WithSignerAddress("celestia1pjcmwj8w6hyr2c4wehakc5g8cfs36aysgucx66"), state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"), )) + + add(network.DirUnknown) } func exampleValue(t, parent reflect.Type) (any, error) { From 70d6351231dda7d80eaad02a992a31980c85b86d Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 13 Feb 2025 13:50:13 +0200 Subject: [PATCH 8/8] fix(core): add retry logic for the grpc subscription (#4093) --- core/exchange_test.go | 10 +-- core/fetcher.go | 122 +++++++++++--------------- core/fetcher_no_race_test.go | 3 - core/fetcher_test.go | 143 ++++++++----------------------- core/header_test.go | 4 +- core/listener.go | 85 ++++-------------- core/listener_test.go | 4 +- core/testing.go | 112 +++++++++++++++++++++++- go.mod | 2 +- nodebuilder/core/constructors.go | 28 +++++- 10 files changed, 245 insertions(+), 268 deletions(-) diff --git a/core/exchange_test.go b/core/exchange_test.go index 79df507b0f..3e46aedf57 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -24,7 +24,6 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { cfg := DefaultTestConfig() fetcher, cctx := createCoreFetcher(t, cfg) - generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) @@ -62,7 +61,6 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { require.NoError(t, err) assert.True(t, has) } - require.NoError(t, fetcher.Stop(ctx)) } // TestExchange_DoNotStoreHistoric tests that the CoreExchange will not @@ -73,7 +71,6 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { cfg := DefaultTestConfig() fetcher, cctx := createCoreFetcher(t, cfg) - generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) @@ -121,7 +118,6 @@ func TestExchange_StoreHistoricIfArchival(t *testing.T) { cfg := DefaultTestConfig() fetcher, cctx := createCoreFetcher(t, cfg) - generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) @@ -211,12 +207,8 @@ func generateNonEmptyBlocks( // generate several non-empty blocks generateCtx, generateCtxCancel := context.WithCancel(context.Background()) - sub, err := fetcher.SubscribeNewBlockEvent(ctx) + sub, err := fetcher.SubscribeNewBlockEvent(generateCtx) require.NoError(t, err) - defer func() { - err = fetcher.Stop(ctx) - require.NoError(t, err) - }() go fillBlocks(t, generateCtx, cfg, cctx) diff --git a/core/fetcher.go b/core/fetcher.go index e47b913d96..2ba8241423 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "sync/atomic" "time" "github.com/gogo/protobuf/proto" @@ -13,7 +12,6 @@ import ( coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" "google.golang.org/grpc" - "google.golang.org/grpc/status" libhead "github.com/celestiaorg/go-header" ) @@ -34,10 +32,6 @@ var ( type BlockFetcher struct { client coregrpc.BlockAPIClient - - doneCh chan struct{} - cancel context.CancelFunc - isListeningForBlocks atomic.Bool } // NewBlockFetcher returns a new `BlockFetcher`. @@ -47,18 +41,6 @@ func NewBlockFetcher(conn *grpc.ClientConn) (*BlockFetcher, error) { }, nil } -// Stop stops the block fetcher. -// The underlying gRPC connection needs to be stopped separately. -func (f *BlockFetcher) Stop(ctx context.Context) error { - f.cancel() - select { - case <-f.doneCh: - return nil - case <-ctx.Done(): - return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) - } -} - // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { commit, err := f.Commit(ctx, height) @@ -162,72 +144,72 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V } // SubscribeNewBlockEvent subscribes to new block events from Core, returning -// a new block event channel on success. -func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - if f.isListeningForBlocks.Load() { - return nil, fmt.Errorf("already subscribed to new blocks") - } - ctx, cancel := context.WithCancel(ctx) - f.cancel = cancel - f.doneCh = make(chan struct{}) - f.isListeningForBlocks.Store(true) - - subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) - if err != nil { - close(f.doneCh) - f.isListeningForBlocks.Store(false) - return nil, err - } +// a new block event channel. +func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (chan types.EventDataSignedBlock, error) { + signedBlockCh := make(chan types.EventDataSignedBlock, 1) - log.Debug("created a subscription. Start listening for new blocks...") - signedBlockCh := make(chan types.EventDataSignedBlock) go func() { - defer close(f.doneCh) defer close(signedBlockCh) - defer func() { f.isListeningForBlocks.Store(false) }() for { select { case <-ctx.Done(): return default: - resp, err := subscription.Recv() - if err != nil { - log.Errorw("fetcher: error receiving new height", "err", err.Error()) - _, ok := status.FromError(err) // parsing the gRPC error - if ok { - // ok means that err contains a gRPC status error. - // move on another round of resubscribing. - return - } - continue - } - withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) - signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) - ctxCancel() - if err != nil { - log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) - // sleeping a bit to avoid retrying instantly and give time for the gRPC connection - // to recover automatically. - time.Sleep(time.Second) - continue - } - select { - case signedBlockCh <- types.EventDataSignedBlock{ - Header: *signedBlock.Header, - Commit: *signedBlock.Commit, - ValidatorSet: *signedBlock.ValidatorSet, - Data: *signedBlock.Data, - }: - case <-ctx.Done(): - return - } + } + + subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) + if err != nil { + // try re-subscribe in case of any errors that can come during subscription. gRPC + // retry mechanism has a back off on retries, so we don't need timers anymore. + log.Warnw("fetcher: failed to subscribe to new block events", "err", err) + continue + } + + log.Debug("fetcher: subscription created") + err = f.receive(ctx, signedBlockCh, subscription) + if err != nil { + log.Warnw("fetcher: error receiving new height", "err", err.Error()) + continue } } }() - return signedBlockCh, nil } +func (f *BlockFetcher) receive( + ctx context.Context, + signedBlockCh chan types.EventDataSignedBlock, + subscription coregrpc.BlockAPI_SubscribeNewHeightsClient, +) error { + log.Debug("fetcher: started listening for new blocks") + for { + resp, err := subscription.Recv() + if err != nil { + return err + } + + // TODO(@vgonkivs): make timeout configurable + withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) + signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) + ctxCancel() + if err != nil { + log.Warnw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) + continue + } + + select { + case signedBlockCh <- types.EventDataSignedBlock{ + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, + }: + case <-ctx.Done(): + return ctx.Err() + } + } +} + // IsSyncing returns the sync status of the Core connection: true for // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index d184fb8b91..ab3dc6c4b3 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -25,8 +25,6 @@ func TestBlockFetcherHeaderValues(t *testing.T) { client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - - // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) // read once from channel to generate next block @@ -56,5 +54,4 @@ func TestBlockFetcherHeaderValues(t *testing.T) { // compare ValidatorSet hash to the ValidatorsHash from first block height hexBytes := valSet.Hash() assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes) - require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 0f2c4dc93c..dec07d81a7 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -2,15 +2,12 @@ package core import ( "context" - "errors" "net" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - coregrpc "github.com/tendermint/tendermint/rpc/grpc" - "google.golang.org/grpc" ) func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { @@ -41,132 +38,60 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { require.NoError(t, ctx.Err()) } } - require.NoError(t, fetcher.Stop(ctx)) } -type mockAPIService struct { - coregrpc.UnimplementedBlockAPIServer - - grpcServer *grpc.Server - fetcher *BlockFetcher -} - -func (m *mockAPIService) SubscribeNewHeights( - _ *coregrpc.SubscribeNewHeightsRequest, - srv coregrpc.BlockAPI_SubscribeNewHeightsServer, -) error { - for i := 0; i < 20; i++ { - b, err := m.fetcher.GetBlock(context.Background(), int64(i)) - if err != nil { - return err - } - err = srv.Send(&coregrpc.NewHeightEvent{Height: b.Header.Height, Hash: b.Header.Hash()}) - if err != nil { - return err - } - time.Sleep(time.Second) - } - return nil -} - -func (m *mockAPIService) BlockByHeight( - req *coregrpc.BlockByHeightRequest, - srv coregrpc.BlockAPI_BlockByHeightServer, -) error { - b, err := m.fetcher.client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: req.Height}) - if err != nil { - return err - } - data, err := b.Recv() - if err != nil { - return err - } - err = srv.Send(data) - if err != nil { - return err - } - return nil -} - -func (m *mockAPIService) Start() error { - listener, err := net.Listen("tcp", ":50051") - if err != nil { - return err - } - - grpcServer := grpc.NewServer() - m.grpcServer = grpcServer - coregrpc.RegisterBlockAPIServer(grpcServer, m) - go func() { - err = grpcServer.Serve(listener) - if err != nil && !errors.Is(err, grpc.ErrServerStopped) { - panic(err) - } - }() - return nil -} - -func (m *mockAPIService) Stop() error { - m.grpcServer.Stop() - return nil -} - -func (m *mockAPIService) generateBlocksWithHeights(ctx context.Context, t *testing.T) { - cfg := DefaultTestConfig() - fetcher, cctx := createCoreFetcher(t, cfg) - m.fetcher = fetcher - generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx) - require.NoError(t, fetcher.Stop(ctx)) -} - -// TestStart_SubscribeNewBlockEvent_Resubscription ensures that subscription will not stuck in case +// TestFetcher_Resubscription ensures that subscription will not stuck in case // gRPC server was stopped. -func TestStart_SubscribeNewBlockEvent_Resubscription(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) +func TestFetcher_Resubscription(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) t.Cleanup(cancel) - m := &mockAPIService{} - m.generateBlocksWithHeights(ctx, t) - - require.NoError(t, m.Start()) - - client := newTestClient(t, "localhost", "50051") - + // run new consensus node + cfg := DefaultTestConfig() + tn := NewNetwork(t, cfg) + require.NoError(t, tn.Start()) + host, port, err := net.SplitHostPort(tn.GRPCClient.Target()) + require.NoError(t, err) + client := newTestClient(t, host, port) fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - // subscribe to block event to get blocks + + // subscribe to the channel to get new blocks + // and try to get one block newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) - select { case newBlockFromChan := <-newBlockChan: h := newBlockFromChan.Header.Height - _, err := fetcher.GetSignedBlock(ctx, h) + _, err = fetcher.GetSignedBlock(ctx, h) require.NoError(t, err) case <-ctx.Done(): - require.NoError(t, ctx.Err()) + t.Fatal("timeout waiting for block subscription") } + // stop the consensus node and wait some time to ensure that the subscription is stuck + // and there is no connection with the consensus node. + require.NoError(t, tn.Stop()) - require.NoError(t, m.Stop()) - - // stopping the server sends an error with the status code - // to client, so the subscription loop will be finished. - // check that newBlockChan was closed - _, ok := <-newBlockChan - require.False(t, ok) + waitCtx, waitCancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(waitCancel) + select { + case <-newBlockChan: + t.Fatal("blocks received after stopping") + case <-ctx.Done(): + t.Fatal("test finishes") + case <-waitCtx.Done(): + } - // start server and try to get a new subscription - require.NoError(t, m.Start()) - newBlockChan, err = fetcher.SubscribeNewBlockEvent(ctx) - require.NoError(t, err) + // start new consensus node(some components in app can't be restarted) + // on the same address and listen for the new blocks + tn = NewNetwork(t, cfg) + require.NoError(t, tn.Start()) select { case newBlockFromChan := <-newBlockChan: h := newBlockFromChan.Header.Height - _, err := fetcher.GetSignedBlock(ctx, h) + _, err = fetcher.GetSignedBlock(ctx, h) require.NoError(t, err) case <-ctx.Done(): - require.NoError(t, ctx.Err()) + t.Fatal("timeout waiting for block subscription") } - require.NoError(t, m.Stop()) - require.NoError(t, m.fetcher.Stop(ctx)) - require.NoError(t, fetcher.Stop(ctx)) + require.NoError(t, tn.Stop()) } diff --git a/core/header_test.go b/core/header_test.go index dcc5dba9e2..5dcf03cdf2 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -28,9 +28,9 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { require.NoError(t, err) sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) - <-sub + dataBlock := <-sub - height := int64(1) + height := dataBlock.Header.Height b, err := fetcher.GetBlock(ctx, height) require.NoError(t, err) diff --git a/core/listener.go b/core/listener.go index c020b84758..6dd2e7f964 100644 --- a/core/listener.go +++ b/core/listener.go @@ -18,12 +18,7 @@ import ( "github.com/celestiaorg/celestia-node/store" ) -var ( - tracer = otel.Tracer("core/listener") - retrySubscriptionDelay = 5 * time.Second - - errInvalidSubscription = errors.New("invalid subscription") -) +var tracer = otel.Tracer("core/listener") // Listener is responsible for listening to Core for // new block events and converting new Core blocks into @@ -101,21 +96,17 @@ func (cl *Listener) Start(context.Context) error { cl.cancel = cancel cl.closed = make(chan struct{}) - sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) + subs, err := cl.fetcher.SubscribeNewBlockEvent(ctx) if err != nil { return err } - go cl.runSubscriber(ctx, sub) + + go cl.listen(ctx, subs) return nil } // Stop stops the listener loop. func (cl *Listener) Stop(ctx context.Context) error { - err := cl.fetcher.Stop(ctx) - if err != nil { - log.Warnw("listener: stopping gRPC block event", "err", err) - } - cl.cancel() select { case <-cl.closed: @@ -125,63 +116,18 @@ func (cl *Listener) Stop(ctx context.Context) error { return ctx.Err() } - err = cl.metrics.Close() + err := cl.metrics.Close() if err != nil { log.Warnw("listener: closing metrics", "err", err) } return nil } -// runSubscriber runs a subscriber to receive event data of new signed blocks. It will attempt to -// resubscribe in case error happens during listening of subscription -func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDataSignedBlock) { - defer close(cl.closed) - for { - err := cl.listen(ctx, sub) - if ctx.Err() != nil { - // listener stopped because external context was canceled - return - } - if errors.Is(err, errInvalidSubscription) { - // stop node if there is a critical issue with the block subscription - log.Fatalf("listener: %v", err) //nolint:gocritic - } - - log.Warnw("listener: subscriber error, resubscribing...", "err", err) - sub = cl.resubscribe(ctx) - if sub == nil { - return - } - } -} - -func (cl *Listener) resubscribe(ctx context.Context) <-chan types.EventDataSignedBlock { - err := cl.fetcher.Stop(ctx) - if err != nil { - log.Warnw("listener: unsubscribe", "err", err) - } - - ticker := time.NewTicker(retrySubscriptionDelay) - defer ticker.Stop() - for { - sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) - if err == nil { - return sub - } - log.Errorw("listener: resubscribe", "err", err) - - select { - case <-ctx.Done(): - return nil - case <-ticker.C: - } - } -} - // listen kicks off a loop, listening for new block events from Core, // generating ExtendedHeaders and broadcasting them to the header-sub // gossipsub network. -func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSignedBlock) error { +func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSignedBlock) { + defer close(cl.closed) defer log.Info("listener: listening stopped") timeout := time.NewTimer(cl.listenerTimeout) defer timeout.Stop() @@ -189,13 +135,16 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned select { case b, ok := <-sub: if !ok { - return errors.New("underlying subscription was closed") + log.Error("underlying subscription was closed") + return } if cl.chainID != "" && b.Header.ChainID != cl.chainID { - log.Errorf("listener: received block with unexpected chain ID: expected %s,"+ - " received %s", cl.chainID, b.Header.ChainID) - return errInvalidSubscription + // stop node if there is a critical issue with the block subscription + panic(fmt.Sprintf("listener: received block with unexpected chain ID: expected %s,"+ + " received %s. blockHeight: %d blockHash: %x.", + cl.chainID, b.Header.ChainID, b.Header.Height, b.Header.Hash()), + ) } log.Debugw("listener: new block from core", "height", b.Header.Height) @@ -211,13 +160,13 @@ func (cl *Listener) listen(ctx context.Context, sub <-chan types.EventDataSigned if !timeout.Stop() { <-timeout.C } - timeout.Reset(cl.listenerTimeout) case <-timeout.C: cl.metrics.subscriptionStuck(ctx) - return errors.New("underlying subscription is stuck") + log.Error("underlying subscription is stuck") case <-ctx.Done(): - return ctx.Err() + return } + timeout.Reset(cl.listenerTimeout) } } diff --git a/core/listener_test.go b/core/listener_test.go index e8dec78360..565e5bdc2b 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -93,8 +93,7 @@ func TestListenerWithWrongChainRPC(t *testing.T) { sub, err := cl.fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) - err = cl.listen(ctx, sub) - assert.ErrorIs(t, err, errInvalidSubscription) + assert.Panics(t, func() { cl.listen(ctx, sub) }) } // TestListener_DoesNotStoreHistoric tests the (unlikely) case that @@ -136,6 +135,7 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) { require.NoError(t, err) assert.False(t, has) } + require.NoError(t, cl.Stop(ctx)) } func createMocknetWithTwoPubsubEndpoints(ctx context.Context, t *testing.T) (*pubsub.PubSub, *pubsub.PubSub) { diff --git a/core/testing.go b/core/testing.go index 586bf57f83..01bd748bba 100644 --- a/core/testing.go +++ b/core/testing.go @@ -3,12 +3,17 @@ package core import ( "context" "net" + "path/filepath" "testing" "time" + srvtypes "github.com/cosmos/cosmos-sdk/server/types" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/stretchr/testify/require" tmrand "github.com/tendermint/tendermint/libs/rand" + "github.com/tendermint/tendermint/node" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" @@ -69,9 +74,27 @@ func generateRandomAccounts(n int) []string { func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn { t.Helper() - opt := grpc.WithTransportCredentials(insecure.NewCredentials()) + + retryInterceptor := grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + retryStreamInterceptor := grpc_retry.StreamClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + + opts := []grpc.DialOption{ + grpc.WithUnaryInterceptor(retryInterceptor), + grpc.WithStreamInterceptor(retryStreamInterceptor), + grpc.WithTransportCredentials(insecure.NewCredentials()), + } endpoint := net.JoinHostPort(ip, port) - client, err := grpc.NewClient(endpoint, opt) + client, err := grpc.NewClient(endpoint, opts...) require.NoError(t, err) ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) t.Cleanup(cancel) @@ -79,3 +102,88 @@ func newTestClient(t *testing.T, ip, port string) *grpc.ClientConn { require.True(t, ready) return client } + +// Network wraps `testnode.Context` allowing to manually stop all underlying connections. +// TODO @vgonkivs: remove after https://github.com/celestiaorg/celestia-app/issues/4304 is done. +type Network struct { + testnode.Context + config *testnode.Config + app srvtypes.Application + tmNode *node.Node + + stopNode func() error + stopGRPC func() error + stopAPI func() error +} + +func NewNetwork(t testing.TB, config *testnode.Config) *Network { + t.Helper() + + // initialize the genesis file and validator files for the first validator. + baseDir := filepath.Join(t.TempDir(), "testnode") + err := genesis.InitFiles(baseDir, config.TmConfig, config.AppConfig, config.Genesis, 0) + require.NoError(t, err) + + tmNode, app, err := testnode.NewCometNode(baseDir, &config.UniversalTestingConfig) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(func() { + cancel() + }) + + cctx := testnode.NewContext( + ctx, + config.Genesis.Keyring(), + config.TmConfig, + config.Genesis.ChainID, + config.AppConfig.API.Address, + ) + + return &Network{ + Context: cctx, + config: config, + app: app, + tmNode: tmNode, + } +} + +func (n *Network) Start() error { + cctx, stopNode, err := testnode.StartNode(n.tmNode, n.Context) + if err != nil { + return err + } + cctx, cleanupGRPC, err := testnode.StartGRPCServer(n.app, n.config.AppConfig, cctx) + if err != nil { + return err + } + + apiServer, err := testnode.StartAPIServer(n.app, *n.config.AppConfig, cctx) + if err != nil { + return err + } + + n.Context = cctx + n.stopNode = stopNode + n.stopGRPC = cleanupGRPC + n.stopAPI = apiServer.Close + return nil +} + +func (n *Network) Stop() error { + err := n.stopNode() + if err != nil { + return err + } + + err = n.stopGRPC() + if err != nil { + return err + } + + err = n.stopAPI() + if err != nil { + return err + } + return nil +} diff --git a/go.mod b/go.mod index a365488ed4..892231341d 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grafana/otel-profiling-go v0.5.1 github.com/grafana/pyroscope-go v1.1.2 + github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.24.0 @@ -186,7 +187,6 @@ require ( github.com/gorilla/handlers v1.5.2 // indirect github.com/gorilla/websocket v1.5.3 // indirect github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect - github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index e4e7593e3f..f5213c1283 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -8,9 +8,12 @@ import ( "net" "os" "path/filepath" + "time" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "go.uber.org/fx" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -30,13 +33,34 @@ func grpcClient(lc fx.Lifecycle, cfg Config) (*grpc.ClientConn, error) { } else { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + + retryInterceptor := grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + retryStreamInterceptor := grpc_retry.StreamClientInterceptor( + grpc_retry.WithMax(5), + grpc_retry.WithCodes(codes.Unavailable), + grpc_retry.WithBackoff( + grpc_retry.BackoffExponentialWithJitter(time.Second, 2.0)), + ) + + opts = append(opts, + grpc.WithUnaryInterceptor(retryInterceptor), + grpc.WithStreamInterceptor(retryStreamInterceptor), + ) + if cfg.XTokenPath != "" { xToken, err := parseTokenPath(cfg.XTokenPath) if err != nil { return nil, err } - opts = append(opts, grpc.WithUnaryInterceptor(authInterceptor(xToken))) - opts = append(opts, grpc.WithStreamInterceptor(authStreamInterceptor(xToken))) + opts = append(opts, + grpc.WithChainUnaryInterceptor(authInterceptor(xToken), retryInterceptor), + grpc.WithChainStreamInterceptor(authStreamInterceptor(xToken), retryStreamInterceptor), + ) } endpoint := net.JoinHostPort(cfg.IP, cfg.Port)