diff --git a/pkg/network/bqueue/queue.go b/pkg/network/bqueue/queue.go index 9837d78e7f..008adfbc20 100644 --- a/pkg/network/bqueue/queue.go +++ b/pkg/network/bqueue/queue.go @@ -5,16 +5,14 @@ import ( "sync/atomic" "time" - "github.com/nspcc-dev/neo-go/pkg/core/block" "go.uber.org/zap" ) // Blockqueuer is an interface for a block queue. -type Blockqueuer interface { - AddBlock(block *block.Block) error - AddHeaders(...*block.Header) error - BlockHeight() uint32 - HeaderHeight() uint32 +type Blockqueuer[Q Queueable] interface { + AddItem(Q Queueable) error + AddItems(...Q) error + Height() uint32 } // OperationMode is the mode of operation for the block queue. @@ -28,87 +26,71 @@ const ( Blocking OperationMode = 1 ) -// TypeMode is the mode of the queue, which determines whether it is a queue of headers or blocks. -// Could be either Headers or Blocks. -type TypeMode byte - -const ( - // Headers is the mode of the queue, where it is a queue of headers. - Headers TypeMode = 0 - // Blocks is the mode of the queue, where it is a queue of blocks. - Blocks TypeMode = 1 -) - -// Queueble is an interface for a block or a header. -type Queueble interface { +// Queueable is an interface for a block or a header. +type Queueable interface { GetIndex() uint32 } // Queue is the block queue. -type Queue struct { +type Queue[Q Queueable] struct { log *zap.Logger queueLock sync.RWMutex - queue []Queueble + queue []Queueable lastQ uint32 checkBlocks chan struct{} - chain Blockqueuer - relayF func(queueble Queueble) + chain Blockqueuer[Q] + relayF func(queueble Queueable) discarded atomic.Bool len int lenUpdateF func(int) cacheSize int mode OperationMode - getHeight func() uint32 } // DefaultCacheSize is the default amount of blocks (or headers) above the current height // which are stored in the queue. const DefaultCacheSize = 2000 -func (bq *Queue) indexToPosition(i uint32) int { +func (bq *Queue[Queueable]) indexToPosition(i uint32) int { return int(i) % bq.cacheSize } // New creates an instance of Queue that handles both blocks and headers. -func New(bc Blockqueuer, log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode, objectMode TypeMode) *Queue { +func New[Q Queueable](bc Blockqueuer[Q], log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue[Q] { if log == nil { return nil } if cacheSize <= 0 { cacheSize = DefaultCacheSize } - height := bc.BlockHeight - if objectMode == Headers { - height = bc.HeaderHeight - } - r, ok := relayer.(func(queueble Queueble)) + + r, ok := relayer.(func(queueble Queueable)) if !ok { r = nil } - return &Queue{ + return &Queue[Q]{ log: log, - queue: make([]Queueble, cacheSize), + queue: make([]Queueable, cacheSize), checkBlocks: make(chan struct{}, 1), chain: bc, relayF: r, lenUpdateF: lenMetricsUpdater, cacheSize: cacheSize, mode: mode, - getHeight: height, } } // Run must be called in a separate goroutine; it processes both blocks and headers. -func (bq *Queue) Run() { - var lastHeight = bq.getHeight() +func (bq *Queue[Q]) Run() { + var lastHeight = bq.chain.Height() for { _, ok := <-bq.checkBlocks if !ok { break } for { - h := bq.getHeight() + h := bq.chain.Height() pos := bq.indexToPosition(h + 1) bq.queueLock.Lock() b := bq.queue[pos] @@ -126,13 +108,13 @@ func (bq *Queue) Run() { break } - err := bq.add(b) + err := bq.chain.AddItem(b) if err != nil { // The block might already be added by the consensus. - if bq.getHeight() < b.GetIndex() { + if bq.chain.Height() < b.GetIndex() { bq.log.Warn("Queue: failed adding item into the blockchain", zap.String("error", err.Error()), - zap.Uint32("Height", bq.getHeight()), + zap.Uint32("Height", bq.chain.Height()), zap.Uint32("nextIndex", b.GetIndex())) } } else if bq.relayF != nil { @@ -152,29 +134,9 @@ func (bq *Queue) Run() { } } -func (bl *Queue) add(item Queueble) error { - if b, ok := item.(*block.Block); ok { - return bl.chain.AddBlock(b) - } - if h, ok := item.(*block.Header); ok { - return bl.chain.AddHeaders(h) - } - return nil -} - // Put enqueues block or header to be added to the chain. -func (bq *Queue) Put(item any) error { - if b, ok := item.(*block.Block); ok { - return bq.put(b) - } - if h, ok := item.(*block.Header); ok { - return bq.put(h) - } - return nil -} - -func (bq *Queue) put(block Queueble) error { - h := bq.getHeight() +func (bq *Queue[Q]) Put(block Queueable) error { + h := bq.chain.Height() bq.queueLock.Lock() defer bq.queueLock.Unlock() if bq.discarded.Load() { @@ -198,7 +160,7 @@ func (bq *Queue) put(block Queueble) error { bq.queueLock.Lock() return nil } - h = bq.getHeight() + h = bq.chain.Height() if h+uint32(bq.cacheSize) >= block.GetIndex() { bq.queueLock.Lock() break @@ -231,14 +193,14 @@ func (bq *Queue) put(block Queueble) error { // LastQueued returns the index of the last queued block and the queue's capacity // left. -func (bq *Queue) LastQueued() (uint32, int) { +func (bq *Queue[Q]) LastQueued() (uint32, int) { bq.queueLock.RLock() defer bq.queueLock.RUnlock() return bq.lastQ, bq.cacheSize - bq.len } // Discard stops the queue and prevents it from accepting more blocks to enqueue. -func (bq *Queue) Discard() { +func (bq *Queue[Q]) Discard() { if bq.discarded.CompareAndSwap(false, true) { bq.queueLock.Lock() close(bq.checkBlocks)