Skip to content

Commit

Permalink
bqueue: add headers to the queue
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 27, 2025
1 parent 64c4de4 commit 30c2f9b
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 52 deletions.
4 changes: 2 additions & 2 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Ledger interface {

// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
Put(block any) error
}

// Service represents a consensus instance.
Expand Down Expand Up @@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error {
bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb))

if err := s.BlockQueue.PutBlock(bb); err != nil {
if err := s.BlockQueue.Put(bb); err != nil {
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,8 +528,12 @@ type testBlockQueuer struct {
var _ = BlockQueuer(testBlockQueuer{})

// PutBlock implements BlockQueuer interface.
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
return bq.bc.AddBlock(b)
func (bq testBlockQueuer) Put(b any) error {
blk, ok := b.(*coreb.Block)
if !ok {
return nil
}
return bq.bc.AddBlock(blk)
}

func getTestValidator(i int) (*keys.PrivateKey, *keys.PublicKey) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type auxBlockIn struct {
Transactions []json.RawMessage `json:"tx"`
}

// GetIndex returns the index of the block.
func (b *Block) GetIndex() uint32 {
return b.Header.Index
}

// ComputeMerkleRoot computes Merkle tree root hash based on actual block's data.
func (b *Block) ComputeMerkleRoot() util.Uint256 {
hashes := make([]util.Uint256, len(b.Transactions))
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ type baseAux struct {
Witnesses []transaction.Witness `json:"witnesses"`
}

// GetIndex returns the index of the block.
func (b *Header) GetIndex() uint32 {
return b.Index
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down
5 changes: 5 additions & 0 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {

return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest header.
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
}
105 changes: 76 additions & 29 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,81 +14,108 @@ type Blockqueuer interface {
AddBlock(block *block.Block) error
AddHeaders(...*block.Header) error
BlockHeight() uint32
HeaderHeight() uint32
}

// OperationMode is the mode of operation for the block queue.
// Could be either Blocking or NonBlocking.
type OperationMode byte

const (
// NonBlocking means that PutBlock will return immediately if the queue is full.
// NonBlocking means that Put will return immediately if the queue is full.
NonBlocking OperationMode = 0
// Blocking means that PutBlock will wait until there is enough space in the queue.
// Blocking means that Put will wait until there is enough space in the queue.
Blocking OperationMode = 1
)

// TypeMode is the mode of the queue, which determines whether it is a queue of headers or blocks.
// Could be either Headers or Blocks.
type TypeMode byte

const (
// Headers is the mode of the queue, where it is a queue of headers.
Headers TypeMode = 0
// Blocks is the mode of the queue, where it is a queue of blocks.
Blocks TypeMode = 1
)

// Queueble is an interface for a block or a header.
type Queueble interface {
GetIndex() uint32
}

// Queue is the block queue.
type Queue struct {
log *zap.Logger
queueLock sync.RWMutex
queue []*block.Block
queue []Queueble
lastQ uint32
checkBlocks chan struct{}
chain Blockqueuer
relayF func(*block.Block)
relayF func(queueble Queueble)
discarded atomic.Bool
len int
lenUpdateF func(int)
cacheSize int
mode OperationMode
getHeight func() uint32
}

// DefaultCacheSize is the default amount of blocks above the current height
// DefaultCacheSize is the default amount of blocks (or headers) above the current height
// which are stored in the queue.
const DefaultCacheSize = 2000

func (bq *Queue) indexToPosition(i uint32) int {
return int(i) % bq.cacheSize
}

// New creates an instance of BlockQueue.
func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue {
// New creates an instance of Queue that handles both blocks and headers.
func New(bc Blockqueuer, log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode, objectMode TypeMode) *Queue {
if log == nil {
return nil
}
if cacheSize <= 0 {
cacheSize = DefaultCacheSize
}
height := bc.BlockHeight
if objectMode == Headers {
height = bc.HeaderHeight
}
r, ok := relayer.(func(queueble Queueble))
if !ok {
r = nil
}

return &Queue{
log: log,
queue: make([]*block.Block, cacheSize),
queue: make([]Queueble, cacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: relayer,
relayF: r,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
getHeight: height,
}
}

// Run runs the BlockQueue queueing loop. It must be called in a separate routine.
// Run must be called in a separate goroutine; it processes both blocks and headers.
func (bq *Queue) Run() {
var lastHeight = bq.chain.BlockHeight()
var lastHeight = bq.getHeight()
for {
_, ok := <-bq.checkBlocks
if !ok {
break
}
for {
h := bq.chain.BlockHeight()
h := bq.getHeight()
pos := bq.indexToPosition(h + 1)
bq.queueLock.Lock()
b := bq.queue[pos]
// The chain moved forward using blocks from other sources (consensus).
for i := lastHeight; i < h; i++ {
old := bq.indexToPosition(i + 1)
if bq.queue[old] != nil && bq.queue[old].Index == i {
if bq.queue[old] != nil && bq.queue[old].GetIndex() == i {
bq.len--
bq.queue[old] = nil
}
Expand All @@ -99,14 +126,14 @@ func (bq *Queue) Run() {
break
}

err := bq.chain.AddBlock(b)
err := bq.add(b)
if err != nil {
// The block might already be added by the consensus.
if bq.chain.BlockHeight() < b.Index {
bq.log.Warn("blockQueue: failed adding block into the blockchain",
if bq.getHeight() < b.GetIndex() {
bq.log.Warn("Queue: failed adding item into the blockchain",
zap.String("error", err.Error()),
zap.Uint32("blockHeight", bq.chain.BlockHeight()),
zap.Uint32("nextIndex", b.Index))
zap.Uint32("Height", bq.getHeight()),
zap.Uint32("nextIndex", b.GetIndex()))
}
} else if bq.relayF != nil {
bq.relayF(b)
Expand All @@ -125,20 +152,40 @@ func (bq *Queue) Run() {
}
}

// PutBlock enqueues block to be added to the chain.
func (bq *Queue) PutBlock(block *block.Block) error {
h := bq.chain.BlockHeight()
func (bl *Queue) add(item Queueble) error {
if b, ok := item.(*block.Block); ok {
return bl.chain.AddBlock(b)
}
if h, ok := item.(*block.Header); ok {
return bl.chain.AddHeaders(h)
}
return nil
}

// Put enqueues block or header to be added to the chain.
func (bq *Queue) Put(item any) error {
if b, ok := item.(*block.Block); ok {
return bq.put(b)
}
if h, ok := item.(*block.Header); ok {
return bq.put(h)
}
return nil
}

func (bq *Queue) put(block Queueble) error {
h := bq.getHeight()
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
if bq.discarded.Load() {
return nil
}
// Can easily happen when fetching the same blocks from
// different peers, thus not considered as error.
if block.Index <= h {
if block.GetIndex() <= h {
return nil
}
if h+uint32(bq.cacheSize) < block.Index {
if h+uint32(bq.cacheSize) < block.GetIndex() {
switch bq.mode {
case NonBlocking:
return nil
Expand All @@ -151,21 +198,21 @@ func (bq *Queue) PutBlock(block *block.Block) error {
bq.queueLock.Lock()
return nil
}
h = bq.chain.BlockHeight()
if h+uint32(bq.cacheSize) >= block.Index {
h = bq.getHeight()
if h+uint32(bq.cacheSize) >= block.GetIndex() {
bq.queueLock.Lock()
break
}
}
}
}
pos := bq.indexToPosition(block.Index)
pos := bq.indexToPosition(block.GetIndex())
// If we already have it, keep the old block, throw away the new one.
if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index {
if bq.queue[pos] == nil || bq.queue[pos].GetIndex() < block.GetIndex() {
bq.len++
bq.queue[pos] = block
for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index {
bq.lastQ = bq.queue[pos].Index
for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].GetIndex() {
bq.lastQ = bq.queue[pos].GetIndex()
pos++
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/network/bqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
func TestBlockQueue(t *testing.T) {
chain := fakechain.NewFakeChain()
// notice, it's not yet running
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking)
bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking, Blocks)
blocks := make([]*block.Block, 11)
for i := 1; i < 11; i++ {
blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}}
}
// not the ones expected currently
for i := 3; i < 5; i++ {
assert.NoError(t, bq.PutBlock(blocks[i]))
assert.NoError(t, bq.Put(blocks[i]))
}
last, capLeft := bq.LastQueued()
assert.Equal(t, uint32(0), last)
Expand All @@ -30,7 +30,7 @@ func TestBlockQueue(t *testing.T) {
assert.Equal(t, 2, bq.length())
// now added the expected ones (with duplicates)
for i := 1; i < 5; i++ {
assert.NoError(t, bq.PutBlock(blocks[i]))
assert.NoError(t, bq.Put(blocks[i]))
}
// but they're still not put into the blockchain, because bq isn't running
last, capLeft = bq.LastQueued()
Expand All @@ -39,7 +39,7 @@ func TestBlockQueue(t *testing.T) {
assert.Equal(t, uint32(0), chain.BlockHeight())
assert.Equal(t, 4, bq.length())
// block with too big index is dropped
assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}}))
assert.NoError(t, bq.Put(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}}))
assert.Equal(t, 4, bq.length())
go bq.Run()
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
Expand All @@ -51,26 +51,26 @@ func TestBlockQueue(t *testing.T) {
assert.Equal(t, uint32(4), chain.BlockHeight())
// put some old blocks
for i := 1; i < 5; i++ {
assert.NoError(t, bq.PutBlock(blocks[i]))
assert.NoError(t, bq.Put(blocks[i]))
}
last, capLeft = bq.LastQueued()
assert.Equal(t, uint32(4), last)
assert.Equal(t, DefaultCacheSize, capLeft)
assert.Equal(t, 0, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// unexpected blocks with run() active
assert.NoError(t, bq.PutBlock(blocks[8]))
assert.NoError(t, bq.Put(blocks[8]))
assert.Equal(t, 1, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.PutBlock(blocks[7]))
assert.NoError(t, bq.Put(blocks[7]))
assert.Equal(t, 2, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
// sparse put
assert.NoError(t, bq.PutBlock(blocks[10]))
assert.NoError(t, bq.Put(blocks[10]))
assert.Equal(t, 3, bq.length())
assert.Equal(t, uint32(4), chain.BlockHeight())
assert.NoError(t, bq.PutBlock(blocks[6]))
assert.NoError(t, bq.PutBlock(blocks[5]))
assert.NoError(t, bq.Put(blocks[6]))
assert.NoError(t, bq.Put(blocks[5]))
// run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one
assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond)
last, capLeft = bq.LastQueued()
Expand Down
13 changes: 6 additions & 7 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,15 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks)

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks)
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking, bqueue.Blocks)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock,
sync.OnceFunc(func() { close(s.blockFetcherFin) }))
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.Put, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err)
}
Expand Down Expand Up @@ -796,9 +795,9 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error {
return nil
}
if s.stateSync.IsActive() {
return s.bSyncQueue.PutBlock(block)
return s.bSyncQueue.Put(block)
}
return s.bQueue.PutBlock(block)
return s.bQueue.Put(block)
}

// handlePing processes a ping request.
Expand Down
Loading

0 comments on commit 30c2f9b

Please sign in to comment.