diff --git a/pkg/consensus/consensus.go b/pkg/consensus/consensus.go index 5f251c8ed2..e9ddb788d4 100644 --- a/pkg/consensus/consensus.go +++ b/pkg/consensus/consensus.go @@ -58,7 +58,7 @@ type Ledger interface { // BlockQueuer is an interface to the block queue manager sufficient for Service. type BlockQueuer interface { - PutBlock(block *coreb.Block) error + Put(block any) error } // Service represents a consensus instance. @@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error { bb := &b.(*neoBlock).Block bb.Script = *(s.getBlockWitness(bb)) - if err := s.BlockQueue.PutBlock(bb); err != nil { + if err := s.BlockQueue.Put(bb); err != nil { // The block might already be added via the regular network // interaction. if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil { diff --git a/pkg/consensus/consensus_test.go b/pkg/consensus/consensus_test.go index c7066aaca1..522ee184d9 100644 --- a/pkg/consensus/consensus_test.go +++ b/pkg/consensus/consensus_test.go @@ -528,8 +528,12 @@ type testBlockQueuer struct { var _ = BlockQueuer(testBlockQueuer{}) // PutBlock implements BlockQueuer interface. -func (bq testBlockQueuer) PutBlock(b *coreb.Block) error { - return bq.bc.AddBlock(b) +func (bq testBlockQueuer) Put(b any) error { + blk, ok := b.(*coreb.Block) + if !ok { + return nil + } + return bq.bc.AddBlock(blk) } func getTestValidator(i int) (*keys.PrivateKey, *keys.PublicKey) { diff --git a/pkg/core/block/block.go b/pkg/core/block/block.go index 9b246a4d3a..adae8c813f 100644 --- a/pkg/core/block/block.go +++ b/pkg/core/block/block.go @@ -51,6 +51,11 @@ type auxBlockIn struct { Transactions []json.RawMessage `json:"tx"` } +// GetIndex returns the index of the block. +func (b *Block) GetIndex() uint32 { + return b.Header.Index +} + // ComputeMerkleRoot computes Merkle tree root hash based on actual block's data. func (b *Block) ComputeMerkleRoot() util.Uint256 { hashes := make([]util.Uint256, len(b.Transactions)) diff --git a/pkg/core/block/header.go b/pkg/core/block/header.go index 2edd763dc5..dc606cb28c 100644 --- a/pkg/core/block/header.go +++ b/pkg/core/block/header.go @@ -80,6 +80,11 @@ type baseAux struct { Witnesses []transaction.Witness `json:"witnesses"` } +// GetIndex returns the index of the block. +func (b *Header) GetIndex() uint32 { + return b.Index +} + // Hash returns the hash of the block. Notice that it is cached internally, // so no matter how you change the [Header] after the first invocation of this // method it won't change. To get an updated hash in case you're changing diff --git a/pkg/core/statesync/module.go b/pkg/core/statesync/module.go index 4915a279fc..0a5f48301b 100644 --- a/pkg/core/statesync/module.go +++ b/pkg/core/statesync/module.go @@ -508,3 +508,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 { return s.mptpool.GetBatch(limit) } + +// HeaderHeight returns the height of the latest header. +func (s *Module) HeaderHeight() uint32 { + return s.bc.HeaderHeight() +} diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 2899b1ac51..9837d78e7f 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -14,6 +14,7 @@ type Blockqueuer interface { AddBlock(block *block.Block) error AddHeaders(...*block.Header) error BlockHeight() uint32 + HeaderHeight() uint32 } // OperationMode is the mode of operation for the block queue. @@ -21,29 +22,46 @@ type Blockqueuer interface { type OperationMode byte const ( - // NonBlocking means that PutBlock will return immediately if the queue is full. + // NonBlocking means that Put will return immediately if the queue is full. NonBlocking OperationMode = 0 - // Blocking means that PutBlock will wait until there is enough space in the queue. + // Blocking means that Put will wait until there is enough space in the queue. Blocking OperationMode = 1 ) +// TypeMode is the mode of the queue, which determines whether it is a queue of headers or blocks. +// Could be either Headers or Blocks. +type TypeMode byte + +const ( + // Headers is the mode of the queue, where it is a queue of headers. + Headers TypeMode = 0 + // Blocks is the mode of the queue, where it is a queue of blocks. + Blocks TypeMode = 1 +) + +// Queueble is an interface for a block or a header. +type Queueble interface { + GetIndex() uint32 +} + // Queue is the block queue. type Queue struct { log *zap.Logger queueLock sync.RWMutex - queue []*block.Block + queue []Queueble lastQ uint32 checkBlocks chan struct{} chain Blockqueuer - relayF func(*block.Block) + relayF func(queueble Queueble) discarded atomic.Bool len int lenUpdateF func(int) cacheSize int mode OperationMode + getHeight func() uint32 } -// DefaultCacheSize is the default amount of blocks above the current height +// DefaultCacheSize is the default amount of blocks (or headers) above the current height // which are stored in the queue. const DefaultCacheSize = 2000 @@ -51,44 +69,53 @@ func (bq *Queue) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } -// New creates an instance of BlockQueue. -func New(bc Blockqueuer, log *zap.Logger, relayer func(*block.Block), cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue { +// New creates an instance of Queue that handles both blocks and headers. +func New(bc Blockqueuer, log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode, objectMode TypeMode) *Queue { if log == nil { return nil } if cacheSize <= 0 { cacheSize = DefaultCacheSize } + height := bc.BlockHeight + if objectMode == Headers { + height = bc.HeaderHeight + } + r, ok := relayer.(func(queueble Queueble)) + if !ok { + r = nil + } return &Queue{ log: log, - queue: make([]*block.Block, cacheSize), + queue: make([]Queueble, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, - relayF: relayer, + relayF: r, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, mode: mode, + getHeight: height, } } -// Run runs the BlockQueue queueing loop. It must be called in a separate routine. +// Run must be called in a separate goroutine; it processes both blocks and headers. func (bq *Queue) Run() { - var lastHeight = bq.chain.BlockHeight() + var lastHeight = bq.getHeight() for { _, ok := <-bq.checkBlocks if !ok { break } for { - h := bq.chain.BlockHeight() + h := bq.getHeight() pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] // The chain moved forward using blocks from other sources (consensus). for i := lastHeight; i < h; i++ { old := bq.indexToPosition(i + 1) - if bq.queue[old] != nil && bq.queue[old].Index == i { + if bq.queue[old] != nil && bq.queue[old].GetIndex() == i { bq.len-- bq.queue[old] = nil } @@ -99,14 +126,14 @@ func (bq *Queue) Run() { break } - err := bq.chain.AddBlock(b) + err := bq.add(b) if err != nil { // The block might already be added by the consensus. - if bq.chain.BlockHeight() < b.Index { - bq.log.Warn("blockQueue: failed adding block into the blockchain", + if bq.getHeight() < b.GetIndex() { + bq.log.Warn("Queue: failed adding item into the blockchain", zap.String("error", err.Error()), - zap.Uint32("blockHeight", bq.chain.BlockHeight()), - zap.Uint32("nextIndex", b.Index)) + zap.Uint32("Height", bq.getHeight()), + zap.Uint32("nextIndex", b.GetIndex())) } } else if bq.relayF != nil { bq.relayF(b) @@ -125,9 +152,29 @@ func (bq *Queue) Run() { } } -// PutBlock enqueues block to be added to the chain. -func (bq *Queue) PutBlock(block *block.Block) error { - h := bq.chain.BlockHeight() +func (bl *Queue) add(item Queueble) error { + if b, ok := item.(*block.Block); ok { + return bl.chain.AddBlock(b) + } + if h, ok := item.(*block.Header); ok { + return bl.chain.AddHeaders(h) + } + return nil +} + +// Put enqueues block or header to be added to the chain. +func (bq *Queue) Put(item any) error { + if b, ok := item.(*block.Block); ok { + return bq.put(b) + } + if h, ok := item.(*block.Header); ok { + return bq.put(h) + } + return nil +} + +func (bq *Queue) put(block Queueble) error { + h := bq.getHeight() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { @@ -135,10 +182,10 @@ func (bq *Queue) PutBlock(block *block.Block) error { } // Can easily happen when fetching the same blocks from // different peers, thus not considered as error. - if block.Index <= h { + if block.GetIndex() <= h { return nil } - if h+uint32(bq.cacheSize) < block.Index { + if h+uint32(bq.cacheSize) < block.GetIndex() { switch bq.mode { case NonBlocking: return nil @@ -151,21 +198,21 @@ func (bq *Queue) PutBlock(block *block.Block) error { bq.queueLock.Lock() return nil } - h = bq.chain.BlockHeight() - if h+uint32(bq.cacheSize) >= block.Index { + h = bq.getHeight() + if h+uint32(bq.cacheSize) >= block.GetIndex() { bq.queueLock.Lock() break } } } } - pos := bq.indexToPosition(block.Index) + pos := bq.indexToPosition(block.GetIndex()) // If we already have it, keep the old block, throw away the new one. - if bq.queue[pos] == nil || bq.queue[pos].Index < block.Index { + if bq.queue[pos] == nil || bq.queue[pos].GetIndex() < block.GetIndex() { bq.len++ bq.queue[pos] = block - for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].Index { - bq.lastQ = bq.queue[pos].Index + for pos < bq.cacheSize && bq.queue[pos] != nil && bq.lastQ+1 == bq.queue[pos].GetIndex() { + bq.lastQ = bq.queue[pos].GetIndex() pos++ } } diff --git a/pkg/network/bqueue/queue_test.go b/pkg/network/bqueue/queue_test.go index e481fba56f..8413cc1f86 100644 --- a/pkg/network/bqueue/queue_test.go +++ b/pkg/network/bqueue/queue_test.go @@ -13,14 +13,14 @@ import ( func TestBlockQueue(t *testing.T) { chain := fakechain.NewFakeChain() // notice, it's not yet running - bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking) + bq := New(chain, zaptest.NewLogger(t), nil, 0, nil, NonBlocking, Blocks) blocks := make([]*block.Block, 11) for i := 1; i < 11; i++ { blocks[i] = &block.Block{Header: block.Header{Index: uint32(i)}} } // not the ones expected currently for i := 3; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } last, capLeft := bq.LastQueued() assert.Equal(t, uint32(0), last) @@ -30,7 +30,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 2, bq.length()) // now added the expected ones (with duplicates) for i := 1; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } // but they're still not put into the blockchain, because bq isn't running last, capLeft = bq.LastQueued() @@ -39,7 +39,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, uint32(0), chain.BlockHeight()) assert.Equal(t, 4, bq.length()) // block with too big index is dropped - assert.NoError(t, bq.PutBlock(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}})) + assert.NoError(t, bq.Put(&block.Block{Header: block.Header{Index: bq.chain.BlockHeight() + DefaultCacheSize + 1}})) assert.Equal(t, 4, bq.length()) go bq.Run() // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one @@ -51,7 +51,7 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, uint32(4), chain.BlockHeight()) // put some old blocks for i := 1; i < 5; i++ { - assert.NoError(t, bq.PutBlock(blocks[i])) + assert.NoError(t, bq.Put(blocks[i])) } last, capLeft = bq.LastQueued() assert.Equal(t, uint32(4), last) @@ -59,18 +59,18 @@ func TestBlockQueue(t *testing.T) { assert.Equal(t, 0, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // unexpected blocks with run() active - assert.NoError(t, bq.PutBlock(blocks[8])) + assert.NoError(t, bq.Put(blocks[8])) assert.Equal(t, 1, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.PutBlock(blocks[7])) + assert.NoError(t, bq.Put(blocks[7])) assert.Equal(t, 2, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) // sparse put - assert.NoError(t, bq.PutBlock(blocks[10])) + assert.NoError(t, bq.Put(blocks[10])) assert.Equal(t, 3, bq.length()) assert.Equal(t, uint32(4), chain.BlockHeight()) - assert.NoError(t, bq.PutBlock(blocks[6])) - assert.NoError(t, bq.PutBlock(blocks[5])) + assert.NoError(t, bq.Put(blocks[6])) + assert.NoError(t, bq.Put(blocks[5])) // run() is asynchronous, so we need some kind of timeout anyway and this is the simplest one assert.Eventually(t, func() bool { return chain.BlockHeight() == 8 }, 4*time.Second, 100*time.Millisecond) last, capLeft = bq.LastQueued() diff --git a/pkg/network/server.go b/pkg/network/server.go index b8998700c0..ab71a7498d 100644 --- a/pkg/network/server.go +++ b/pkg/network/server.go @@ -221,16 +221,15 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy } s.bQueue = bqueue.New(chain, log, func(b *block.Block) { s.tryStartServices() - }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks) - s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking) + s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks) if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 { s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize } - s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking) + s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking, bqueue.Blocks) var err error - s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.PutBlock, - sync.OnceFunc(func() { close(s.blockFetcherFin) })) + s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.Put, sync.OnceFunc(func() { close(s.blockFetcherFin) })) if err != nil { return nil, fmt.Errorf("failed to create NeoFS BlockFetcher: %w", err) } @@ -796,9 +795,9 @@ func (s *Server) handleBlockCmd(p Peer, block *block.Block) error { return nil } if s.stateSync.IsActive() { - return s.bSyncQueue.PutBlock(block) + return s.bSyncQueue.Put(block) } - return s.bQueue.PutBlock(block) + return s.bQueue.Put(block) } // handlePing processes a ping request. diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 854827c6eb..73f728529f 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -59,7 +59,7 @@ type Service struct { chain Ledger pool poolWrapper - enqueueBlock func(*block.Block) error + enqueueBlock func(any) error account *wallet.Account oidsCh chan oid.ID @@ -81,7 +81,7 @@ type Service struct { } // New creates a new BlockFetcher Service. -func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(*block.Block) error, shutdownCallback func()) (*Service, error) { +func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(any) error, shutdownCallback func()) (*Service, error) { var ( account *wallet.Account err error