Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ethpandaops/dora into skylenet/so…
Browse files Browse the repository at this point in the history
…me-client-page-improvements
  • Loading branch information
skylenet committed Aug 27, 2024
2 parents 3e434a5 + f51115d commit 29055f0
Show file tree
Hide file tree
Showing 14 changed files with 530 additions and 377 deletions.
9 changes: 8 additions & 1 deletion clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func (bs *BeaconStream) startStream() {
Ready: true,
}
case err := <-stream.Errors:
bs.logger.Warnf("beacon block stream error: %v", err)
if strings.Contains(err.Error(), "INTERNAL_ERROR; received from peer") {
// this seems to be a go bug, silently reconnect to the stream
time.Sleep(10 * time.Millisecond)
stream.RetryNow()
} else {
bs.logger.Warnf("beacon block stream error: %v", err)
}

select {
case bs.ReadyChan <- &BeaconStreamStatus{
Ready: false,
Expand Down
16 changes: 15 additions & 1 deletion clients/consensus/rpc/eventstream/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventstream

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -34,6 +35,8 @@ type Stream struct {
isClosed bool
// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
closeMutex sync.Mutex
// retrySleepCancel is a function that can be called to cancel the retry sleep
retrySleepCancel context.CancelFunc
}

type StreamEvent interface {
Expand Down Expand Up @@ -114,6 +117,13 @@ func (stream *Stream) Close() {
}()
}

// RetryNow will force the stream to reconnect a disconnected stream immediately.
func (stream *Stream) RetryNow() {
if cancelFn := stream.retrySleepCancel; cancelFn != nil {
cancelFn()
}
}

// Go's http package doesn't copy headers across when it encounters
// redirects so we need to do that manually.
func checkRedirect(req *http.Request, via []*http.Request) error {
Expand Down Expand Up @@ -215,7 +225,11 @@ func (stream *Stream) retryRestartStream() {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}

time.Sleep(backoff)
ctx, cancel := context.WithTimeout(context.Background(), backoff)
stream.retrySleepCancel = cancel
<-ctx.Done()

stream.retrySleepCancel = nil

if stream.isClosed {
return
Expand Down
13 changes: 13 additions & 0 deletions db/forks.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,16 @@ func UpdateFinalizedForkParents(finalizedRoots [][]byte, tx *sqlx.Tx) error {

return nil
}

func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`
UPDATE forks
SET parent_fork = $1
WHERE base_root = $2
`, parentForkId, parentRoot)
if err != nil {
return err
}

return nil
}
69 changes: 47 additions & 22 deletions indexer/beacon/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ type blockCache struct {
lowestSlot int64
slotMap map[phase0.Slot][]*Block
rootMap map[phase0.Root]*Block
parentMap map[phase0.Root][]*Block
latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes)
}

// newBlockCache creates a new instance of blockCache.
func newBlockCache(indexer *Indexer) *blockCache {
return &blockCache{
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
indexer: indexer,
slotMap: map[phase0.Slot][]*Block{},
rootMap: map[phase0.Root]*Block{},
parentMap: map[phase0.Root][]*Block{},
}
}

Expand Down Expand Up @@ -60,6 +62,25 @@ func (cache *blockCache) createOrGetBlock(root phase0.Root, slot phase0.Slot) (*
return cacheBlock, true
}

// addBlockToParentMap adds the given block to the parent map.
func (cache *blockCache) addBlockToParentMap(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

parentRoot := block.GetParentRoot()
if parentRoot == nil {
return
}

for _, parentBlock := range cache.parentMap[*parentRoot] {
if parentBlock == block {
return
}
}

cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block)
}

// getBlockByRoot returns the cached block with the given root.
func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block {
cache.cacheMutex.RLock()
Expand All @@ -86,27 +107,13 @@ func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

parentBlock := cache.rootMap[parentRoot]

resBlocks := []*Block{}
for slot, blocks := range cache.slotMap {
if parentBlock != nil && slot <= parentBlock.Slot {
continue
}

for _, block := range blocks {
blockParentRoot := block.GetParentRoot()
if blockParentRoot == nil {
continue
}

if bytes.Equal((*blockParentRoot)[:], parentRoot[:]) {
resBlocks = append(resBlocks, block)
}
}
cachedBlocks := cache.parentMap[parentRoot]
blocks := make([]*Block, len(cachedBlocks))
if len(blocks) > 0 {
copy(blocks, cachedBlocks)
}

return resBlocks
return blocks
}

// getBlockByStateRoot returns the block with the given state root.
Expand Down Expand Up @@ -265,8 +272,10 @@ func (cache *blockCache) removeBlock(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

// remove the block from the root map.
delete(cache.rootMap, block.Root)

// remove the block from the slot map.
slotBlocks := cache.slotMap[block.Slot]
if len(slotBlocks) == 1 && slotBlocks[0] == block {
delete(cache.slotMap, block.Slot)
Expand All @@ -278,6 +287,22 @@ func (cache *blockCache) removeBlock(block *Block) {
}
}
}

// remove the block from the parent map.
if parentRoot := block.GetParentRoot(); parentRoot != nil {
parentBlocks := cache.parentMap[*parentRoot]
if len(parentBlocks) == 1 && parentBlocks[0] == block {
delete(cache.parentMap, *parentRoot)
} else if len(parentBlocks) > 1 {
for i, parentBlock := range parentBlocks {
if parentBlock == block {
cache.parentMap[*parentRoot] = append(parentBlocks[:i], parentBlocks[i+1:]...)
break
}
}
}
}

}

// getEpochBlocks returns the blocks that belong to the specified epoch.
Expand Down
61 changes: 51 additions & 10 deletions indexer/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"runtime/debug"
"strings"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand Down Expand Up @@ -105,6 +106,25 @@ func (c *Client) startClientLoop() {
}
}

func (c *Client) formatProcessingTimes(processingTimes []time.Duration) string {
if len(processingTimes) == 0 {
return ""
}

str := strings.Builder{}
str.WriteString(" (")
for i, pt := range processingTimes {
if i > 0 {
str.WriteString(", ")
}

str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds()))
}
str.WriteString(")")

return str.String()
}

// runClientLoop runs the client event processing subroutine.
func (c *Client) runClientLoop() error {
// 1 - load & process head block
Expand All @@ -116,7 +136,7 @@ func (c *Client) runClientLoop() error {

c.headRoot = headRoot

headBlock, isNew, err := c.processBlock(headSlot, headRoot, nil)
headBlock, isNew, processingTimes, err := c.processBlock(headSlot, headRoot, nil)
if err != nil {
return fmt.Errorf("failed processing head block: %v", err)
}
Expand All @@ -126,9 +146,9 @@ func (c *Client) runClientLoop() error {
}

if isNew {
c.logger.Infof("received block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot)
c.logger.Infof("received block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot)
c.logger.Debugf("received known block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes))
}

// 2 - backfill old blocks up to the finalization checkpoint or known in cache
Expand Down Expand Up @@ -263,17 +283,17 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error {

// processStreamBlock processes a block received from the stream (either via block or head events).
func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, error) {
block, isNew, err := c.processBlock(slot, root, nil)
block, isNew, processingTimes, err := c.processBlock(slot, root, nil)
if err != nil {
return nil, err
}

chainState := c.client.GetPool().GetChainState()

if isNew {
c.logger.Infof("received block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:])
c.logger.Infof("received block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:])
c.logger.Debugf("received known block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes))
}

return block, nil
Expand Down Expand Up @@ -323,9 +343,10 @@ func (c *Client) processReorg(oldHead *Block, newHead *Block) error {
}

// processBlock processes a block (from stream & polling).
func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, err error) {
func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, processingTimes []time.Duration, err error) {
chainState := c.client.GetPool().GetChainState()
finalizedSlot := chainState.GetFinalizedSlot()
processingTimes = make([]time.Duration, 3)

if slot < finalizedSlot {
// block is in finalized epoch
Expand All @@ -349,20 +370,34 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return header, nil
}

t1 := time.Now()
defer func() {
processingTimes[0] += time.Since(t1)
}()

return LoadBeaconHeader(c.getContext(), c, root)
})
if err != nil {
return
}

isNew, err = block.EnsureBlock(func() (*spec.VersionedSignedBeaconBlock, error) {

t1 := time.Now()
defer func() {
processingTimes[0] += time.Since(t1)
}()

return LoadBeaconBlock(c.getContext(), c, root)
})
if err != nil {
return
}

if slot >= finalizedSlot && isNew {
c.indexer.blockCache.addBlockToParentMap(block)
t1 := time.Now()

// fork detection
err2 := c.indexer.forkCache.processBlock(block)
if err2 != nil {
Expand All @@ -376,6 +411,9 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return
}

processingTimes[1] = time.Since(t1)
t1 = time.Now()

// write to db
err = db.RunDBTransaction(func(tx *sqlx.Tx) error {
err := db.InsertUnfinalizedBlock(dbBlock, tx)
Expand All @@ -389,6 +427,8 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0
return
}

processingTimes[2] = time.Since(t1)

block.isInUnfinalizedDb = true
c.indexer.blockCache.latestBlock = block
}
Expand Down Expand Up @@ -444,19 +484,20 @@ func (c *Client) backfillParentBlocks(headBlock *Block) error {
break
}

var processingTimes []time.Duration
if parentBlock == nil {
var err error

parentBlock, isNewBlock, err = c.processBlock(parentSlot, parentRoot, parentHead)
parentBlock, isNewBlock, processingTimes, err = c.processBlock(parentSlot, parentRoot, parentHead)
if err != nil {
return fmt.Errorf("could not process block [0x%x]: %v", parentRoot, err)
}
}

if isNewBlock {
c.logger.Infof("received block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot)
c.logger.Infof("received block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes))
} else {
c.logger.Debugf("received known block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot)
c.logger.Debugf("received known block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes))
}

if parentSlot == 0 {
Expand Down
6 changes: 5 additions & 1 deletion indexer/beacon/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,12 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}

canonicalRoots := make([][]byte, len(canonicalBlocks))
canonicalBlockHashes := make([][]byte, len(canonicalBlocks))
for i, block := range canonicalBlocks {
canonicalRoots[i] = block.Root[:]
if blockIndex := block.GetBlockIndex(); blockIndex != nil {
canonicalBlockHashes[i] = blockIndex.ExecutionHash[:]
}
}

t1dur := time.Since(t1) - t1loading
Expand Down Expand Up @@ -330,7 +334,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
return fmt.Errorf("error persisting sync committee assignments to db: %v", err)
}

if err := db.UpdateMevBlockByEpoch(uint64(epoch), specs.SlotsPerEpoch, canonicalRoots, tx); err != nil {
if err := db.UpdateMevBlockByEpoch(uint64(epoch), specs.SlotsPerEpoch, canonicalBlockHashes, tx); err != nil {
return fmt.Errorf("error while updating mev block proposal state: %v", err)
}

Expand Down
6 changes: 3 additions & 3 deletions indexer/beacon/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ type Fork struct {
}

// newFork creates a new Fork instance.
func newFork(forkId ForkKey, baseBlock *Block, leafBlock *Block, parentFork ForkKey) *Fork {
func newFork(forkId ForkKey, baseSlot phase0.Slot, baseRoot phase0.Root, leafBlock *Block, parentFork ForkKey) *Fork {
fork := &Fork{
forkId: forkId,
baseSlot: baseBlock.Slot,
baseRoot: baseBlock.Root,
baseSlot: baseSlot,
baseRoot: baseRoot,
leafSlot: leafBlock.Slot,
leafRoot: leafBlock.Root,
parentFork: parentFork,
Expand Down
Loading

0 comments on commit 29055f0

Please sign in to comment.