diff --git a/core/fetcher.go b/core/fetcher.go index f2b160e108..7cc9cd70a7 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -1,3 +1,5 @@ +// Package core provides fundamental blockchain interaction functionality, +// including block fetching, subscription handling, and validator set management. package core import ( @@ -12,13 +14,18 @@ import ( libhead "github.com/celestiaorg/go-header" ) +// Constants for event subscription const newBlockSubscriber = "NewBlock/Events" var ( - log = logging.Logger("core") + // log is the package-level logger + log = logging.Logger("core") + // newDataSignedBlockQuery defines the query string for subscribing to signed block events newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String() ) +// BlockFetcher provides functionality to fetch blocks, commits, and validator sets +// from a Tendermint Core node. It also supports subscribing to new block events. type BlockFetcher struct { client Client @@ -33,48 +40,49 @@ func NewBlockFetcher(client Client) *BlockFetcher { } } -// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. +// GetBlockInfo queries Core for additional block information, including Commit and ValidatorSet. +// If height is nil, it fetches information for the latest block. +// Returns an error if either the commit or validator set cannot be retrieved. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { commit, err := f.Commit(ctx, height) if err != nil { - return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) + return nil, nil, fmt.Errorf("failed to get commit at height %v: %w", height, err) } - // If a nil `height` is given as a parameter, there is a chance - // that a new block could be produced between getting the latest - // commit and getting the latest validator set. Therefore, it is - // best to get the validator set at the latest commit's height to - // prevent this potential inconsistency. + // If a nil height is given, we use the commit's height to ensure consistency + // between the commit and validator set valSet, err := f.ValidatorSet(ctx, &commit.Height) if err != nil { - return nil, nil, fmt.Errorf("core/fetcher: getting validator set at height %d: %w", height, err) + return nil, nil, fmt.Errorf("failed to get validator set at height %v: %w", height, err) } return commit, valSet, nil } -// GetBlock queries Core for a `Block` at the given height. +// GetBlock retrieves a block at the specified height from Core. +// If height is nil, it fetches the latest block. func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { res, err := f.client.Block(ctx, height) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get block: %w", err) } if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, height: %d", height) + return nil, fmt.Errorf("block not found at height %v", height) } return res.Block, nil } +// GetBlockByHash retrieves a block with the specified hash from Core. func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { res, err := f.client.BlockByHash(ctx, hash) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get block by hash: %w", err) } if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, hash: %s", hash.String()) + return nil, fmt.Errorf("block not found with hash %s", hash.String()) } return res.Block, nil @@ -103,32 +111,69 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit // ValidatorSet queries Core for the ValidatorSet from the // block at the given height. func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) { - perPage := 100 + // Validate height if provided + if height != nil && *height < 0 { + return nil, fmt.Errorf("invalid height: %d, height must be non-negative", *height) + } + + const ( + perPage = 100 // number of validators per page + maxPages = 100 // protection against too many iterations + ) + + vals := make([]*types.Validator, 0) + total := -1 + page := 1 + + for len(vals) != total { + // Protection against too many iterations + if page > maxPages { + return nil, fmt.Errorf("exceeded maximum number of pages (%d) while fetching validator set", maxPages) + } - vals, total := make([]*types.Validator, 0), -1 - for page := 1; len(vals) != total; page++ { res, err := f.client.Validators(ctx, height, &page, &perPage) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to fetch validators at height %v, page %d: %w", height, page, err) } - if res != nil && len(res.Validators) == 0 { - return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) + if res == nil { + return nil, fmt.Errorf("received nil response while fetching validators at height %v, page %d", height, page) + } + + if len(res.Validators) == 0 { + if page == 1 { + return nil, fmt.Errorf("validator set not found at height %v", height) + } + // This shouldn't happen if total was correct + return nil, fmt.Errorf("unexpected empty validator page %d when total is %d", page, total) + } + + // Initialize total on first page + if total == -1 { + total = res.Total + // Pre-allocate the slice with the total size + vals = make([]*types.Validator, 0, total) } - total = res.Total vals = append(vals, res.Validators...) + page++ + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled while fetching validators: %w", ctx.Err()) + default: + } } return types.NewValidatorSet(vals), nil } -// SubscribeNewBlockEvent subscribes to new block events from Core, returning -// a new block event channel on success. +// SubscribeNewBlockEvent subscribes to new block events from Core. +// Returns a channel that receives new block events and an error if the subscription fails. +// The subscription can be cancelled using UnsubscribeNewBlockEvent or when the context is cancelled. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // start the client if not started yet if !f.client.IsRunning() { - return nil, errors.New("client not running") + return nil, errors.New("client is not running") } ctx, cancel := context.WithCancel(ctx) @@ -137,7 +182,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types eventChan, err := f.client.Subscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to subscribe to new blocks: %w", err) } signedBlockCh := make(chan types.EventDataSignedBlock) @@ -150,7 +195,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types return case newEvent, ok := <-eventChan: if !ok { - log.Errorw("fetcher: new blocks subscription channel closed unexpectedly") + log.Errorw("new blocks subscription channel closed unexpectedly") return } signedBlock := newEvent.Data.(types.EventDataSignedBlock)