Skip to content

Commit

Permalink
chore: Remove skip height (#291)
Browse files Browse the repository at this point in the history
closes #288
  • Loading branch information
Lazar955 authored Jan 17, 2025
1 parent d506aa9 commit 46c75a4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 197 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
* [#262](https://github.com/babylonlabs-io/finality-provider/pull/262) Add new command to export pop
* [#284](https://github.com/babylonlabs-io/finality-provider/pull/284) Add new command to delete pop
* [#277](https://github.com/babylonlabs-io/finality-provider/pull/277) Poll many blocks in poller
* [#291](https://github.com/babylonlabs-io/finality-provider/pull/291) chore: remove skip height

## v0.14.3

Expand Down
105 changes: 17 additions & 88 deletions finality-provider/service/chain_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,17 @@ const (
maxFailedCycles = 20
)

type skipHeightRequest struct {
height uint64
resp chan *skipHeightResponse
}

type skipHeightResponse struct {
err error
}

type ChainPoller struct {
isStarted *atomic.Bool
wg sync.WaitGroup
quit chan struct{}

cc clientcontroller.ClientController
cfg *cfg.ChainPollerConfig
metrics *metrics.FpMetrics
blockInfoChan chan *types.BlockInfo
skipHeightChan chan *skipHeightRequest
nextHeight uint64
logger *zap.Logger
mu sync.RWMutex
mu sync.RWMutex
wg sync.WaitGroup
isStarted *atomic.Bool
quit chan struct{}
cc clientcontroller.ClientController
cfg *cfg.ChainPollerConfig
metrics *metrics.FpMetrics
blockInfoChan chan *types.BlockInfo
logger *zap.Logger
nextHeight uint64
}

func NewChainPoller(
Expand All @@ -57,14 +46,13 @@ func NewChainPoller(
metrics *metrics.FpMetrics,
) *ChainPoller {
return &ChainPoller{
isStarted: atomic.NewBool(false),
logger: logger,
cfg: cfg,
cc: cc,
metrics: metrics,
blockInfoChan: make(chan *types.BlockInfo, cfg.BufferSize),
skipHeightChan: make(chan *skipHeightRequest),
quit: make(chan struct{}),
isStarted: atomic.NewBool(false),
logger: logger,
cfg: cfg,
cc: cc,
metrics: metrics,
blockInfoChan: make(chan *types.BlockInfo, cfg.BufferSize),
quit: make(chan struct{}),
}
}

Expand Down Expand Up @@ -268,62 +256,12 @@ func (cp *ChainPoller) pollChain() {
select {
case <-ticker.C:
continue
case req := <-cp.skipHeightChan:
// no need to skip heights if the target height is not higher
// than the next height to retrieve
targetHeight := req.height
if targetHeight <= cp.nextHeight {
resp := &skipHeightResponse{
err: fmt.Errorf(
"the target height %d is not higher than the next height %d to retrieve",
targetHeight, cp.nextHeight)}
req.resp <- resp

continue
}

// drain blocks that can be skipped from blockInfoChan
cp.clearChanBufferUpToHeight(targetHeight)

// set the next height to the skip height
cp.setNextHeight(targetHeight)

cp.logger.Debug("the poller has skipped height(s)",
zap.Uint64("next_height", req.height))

req.resp <- &skipHeightResponse{}

case <-cp.quit:
return
}
}
}

func (cp *ChainPoller) SkipToHeight(height uint64) error {
if !cp.IsRunning() {
return fmt.Errorf("the chain poller is stopped")
}

respChan := make(chan *skipHeightResponse, 1)

// this handles the case when the poller is stopped before the
// skip height request is sent
select {
case <-cp.quit:
return fmt.Errorf("the chain poller is stopped")
case cp.skipHeightChan <- &skipHeightRequest{height: height, resp: respChan}:
}

// this handles the case when the poller is stopped before
// the skip height request is returned
select {
case <-cp.quit:
return fmt.Errorf("the chain poller is stopped")
case resp := <-respChan:
return resp.err
}
}

func (cp *ChainPoller) NextHeight() uint64 {
cp.mu.RLock()
defer cp.mu.RUnlock()
Expand All @@ -337,12 +275,3 @@ func (cp *ChainPoller) setNextHeight(height uint64) {

cp.nextHeight = height
}

func (cp *ChainPoller) clearChanBufferUpToHeight(upToHeight uint64) {
for len(cp.blockInfoChan) > 0 {
block := <-cp.blockInfoChan
if block.Height+1 >= upToHeight {
break
}
}
}
109 changes: 0 additions & 109 deletions finality-provider/service/chain_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package service_test

import (
"math/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -68,111 +67,3 @@ func FuzzChainPoller_Start(f *testing.F) {
}
})
}

// FuzzChainPoller_SkipHeight tests the functionality of SkipHeight
func FuzzChainPoller_SkipHeight(f *testing.F) {
testutil.AddRandomSeedsToFuzzer(f, 10)

f.Fuzz(func(t *testing.T, seed int64) {
t.Parallel()
r := rand.New(rand.NewSource(seed))

currentHeight := uint64(r.Int63n(100) + 1)
startHeight := currentHeight + 1
endHeight := startHeight + uint64(r.Int63n(10)+2)
skipHeight := endHeight + uint64(r.Int63n(10)+2)

ctl := gomock.NewController(t)
mockClientController := mocks.NewMockClientController(ctl)
mockClientController.EXPECT().Close().Return(nil).AnyTimes()
mockClientController.EXPECT().QueryActivatedHeight().Return(uint64(1), nil).AnyTimes()

currentBlockRes := &types.BlockInfo{
Height: endHeight,
}
mockClientController.EXPECT().QueryBestBlock().Return(currentBlockRes, nil).AnyTimes()

var resBlocks []*types.BlockInfo
for i := startHeight; i <= endHeight; i++ {
resBlock := &types.BlockInfo{
Height: i,
}
resBlocks = append(resBlocks, resBlock)
}

pollerCfg := fpcfg.DefaultChainPollerConfig()

mockClientController.EXPECT().QueryBlocks(startHeight, endHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(endHeight+1, endHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(startHeight, skipHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()

resBlocks = append(resBlocks, &types.BlockInfo{
Height: skipHeight,
})
mockClientController.EXPECT().QueryBlocks(skipHeight, endHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(skipHeight+1, endHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()
mockClientController.EXPECT().QueryBlocks(skipHeight+1, skipHeight, pollerCfg.PollSize).Return(resBlocks, nil).AnyTimes()

m := metrics.NewFpMetrics()
pollerCfg.PollInterval = 1 * time.Second
poller := service.NewChainPoller(testutil.GetTestLogger(t), &pollerCfg, mockClientController, m)
// should expect error if the poller is not started
err := poller.SkipToHeight(skipHeight)
require.Error(t, err)
err = poller.Start(startHeight)
require.NoError(t, err)
defer func() {
err := poller.Stop()
require.NoError(t, err)
// should expect error if the poller is stopped
err = poller.SkipToHeight(skipHeight)
require.Error(t, err)
}()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// insert a skipToHeight request with height lower than the next
// height to retrieve, expecting an error
err = poller.SkipToHeight(poller.NextHeight() - 1)
require.Error(t, err)
// insert a skipToHeight request with a height higher than the
// next height to retrieve
err = poller.SkipToHeight(skipHeight)
require.NoError(t, err)
}()

skipped := false
seenHeight := map[uint64]struct{}{}
for uint64(len(seenHeight)) <= endHeight-startHeight {
if skipped {
break
}
select {
case info := <-poller.GetBlockInfoChan():
if info.Height == skipHeight {
skipped = true
} else {
seenHeight[info.Height] = struct{}{}
}
case <-time.After(10 * time.Second):
t.Fatalf("Failed to get block info")
}
}

for i := startHeight; i <= endHeight; i++ {
if i == skipHeight {
break
}
if _, ok := seenHeight[i]; !ok {
t.Fatalf("height %d not seen", i)
}
}

wg.Wait()
require.Eventually(t, func() bool {
return skipHeight+1 == poller.NextHeight()
}, eventuallyWaitTimeOut, eventuallyPollTime)
})
}

0 comments on commit 46c75a4

Please sign in to comment.