Skip to content

Commit

Permalink
bqueue: make queue generic
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 28, 2025
1 parent 30c2f9b commit ef6089a
Showing 1 changed file with 27 additions and 65 deletions.
92 changes: 27 additions & 65 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ import (
"sync/atomic"
"time"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"go.uber.org/zap"
)

// Blockqueuer is an interface for a block queue.
type Blockqueuer interface {
AddBlock(block *block.Block) error
AddHeaders(...*block.Header) error
BlockHeight() uint32
HeaderHeight() uint32
type Blockqueuer[Q Queueable] interface {
AddItem(Q Queueable) error
AddItems(...Q) error
Height() uint32
}

// OperationMode is the mode of operation for the block queue.
Expand All @@ -28,87 +26,71 @@ const (
Blocking OperationMode = 1
)

// TypeMode is the mode of the queue, which determines whether it is a queue of headers or blocks.
// Could be either Headers or Blocks.
type TypeMode byte

const (
// Headers is the mode of the queue, where it is a queue of headers.
Headers TypeMode = 0
// Blocks is the mode of the queue, where it is a queue of blocks.
Blocks TypeMode = 1
)

// Queueble is an interface for a block or a header.
type Queueble interface {
// Queueable is an interface for a block or a header.
type Queueable interface {
GetIndex() uint32
}

// Queue is the block queue.
type Queue struct {
type Queue[Q Queueable] struct {
log *zap.Logger
queueLock sync.RWMutex
queue []Queueble
queue []Queueable
lastQ uint32
checkBlocks chan struct{}
chain Blockqueuer
relayF func(queueble Queueble)
chain Blockqueuer[Q]
relayF func(queueble Queueable)
discarded atomic.Bool
len int
lenUpdateF func(int)
cacheSize int
mode OperationMode
getHeight func() uint32
}

// DefaultCacheSize is the default amount of blocks (or headers) above the current height
// which are stored in the queue.
const DefaultCacheSize = 2000

func (bq *Queue) indexToPosition(i uint32) int {
func (bq *Queue[Queueable]) indexToPosition(i uint32) int {
return int(i) % bq.cacheSize
}

// New creates an instance of Queue that handles both blocks and headers.
func New(bc Blockqueuer, log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode, objectMode TypeMode) *Queue {
func New[Q Queueable](bc Blockqueuer[Q], log *zap.Logger, relayer any, cacheSize int, lenMetricsUpdater func(l int), mode OperationMode) *Queue[Q] {
if log == nil {
return nil
}
if cacheSize <= 0 {
cacheSize = DefaultCacheSize
}
height := bc.BlockHeight
if objectMode == Headers {
height = bc.HeaderHeight
}
r, ok := relayer.(func(queueble Queueble))

r, ok := relayer.(func(queueble Queueable))
if !ok {
r = nil
}

return &Queue{
return &Queue[Q]{
log: log,
queue: make([]Queueble, cacheSize),
queue: make([]Queueable, cacheSize),
checkBlocks: make(chan struct{}, 1),
chain: bc,
relayF: r,
lenUpdateF: lenMetricsUpdater,
cacheSize: cacheSize,
mode: mode,
getHeight: height,
}
}

// Run must be called in a separate goroutine; it processes both blocks and headers.
func (bq *Queue) Run() {
var lastHeight = bq.getHeight()
func (bq *Queue[Q]) Run() {
var lastHeight = bq.chain.Height()
for {
_, ok := <-bq.checkBlocks
if !ok {
break
}
for {
h := bq.getHeight()
h := bq.chain.Height()
pos := bq.indexToPosition(h + 1)
bq.queueLock.Lock()
b := bq.queue[pos]
Expand All @@ -126,13 +108,13 @@ func (bq *Queue) Run() {
break
}

err := bq.add(b)
err := bq.chain.AddItem(b)
if err != nil {
// The block might already be added by the consensus.
if bq.getHeight() < b.GetIndex() {
if bq.chain.Height() < b.GetIndex() {
bq.log.Warn("Queue: failed adding item into the blockchain",
zap.String("error", err.Error()),
zap.Uint32("Height", bq.getHeight()),
zap.Uint32("Height", bq.chain.Height()),
zap.Uint32("nextIndex", b.GetIndex()))
}
} else if bq.relayF != nil {
Expand All @@ -152,29 +134,9 @@ func (bq *Queue) Run() {
}
}

func (bl *Queue) add(item Queueble) error {
if b, ok := item.(*block.Block); ok {
return bl.chain.AddBlock(b)
}
if h, ok := item.(*block.Header); ok {
return bl.chain.AddHeaders(h)
}
return nil
}

// Put enqueues block or header to be added to the chain.
func (bq *Queue) Put(item any) error {
if b, ok := item.(*block.Block); ok {
return bq.put(b)
}
if h, ok := item.(*block.Header); ok {
return bq.put(h)
}
return nil
}

func (bq *Queue) put(block Queueble) error {
h := bq.getHeight()
func (bq *Queue[Q]) Put(block Queueable) error {
h := bq.chain.Height()
bq.queueLock.Lock()
defer bq.queueLock.Unlock()
if bq.discarded.Load() {
Expand All @@ -198,7 +160,7 @@ func (bq *Queue) put(block Queueble) error {
bq.queueLock.Lock()
return nil
}
h = bq.getHeight()
h = bq.chain.Height()
if h+uint32(bq.cacheSize) >= block.GetIndex() {
bq.queueLock.Lock()
break
Expand Down Expand Up @@ -231,14 +193,14 @@ func (bq *Queue) put(block Queueble) error {

// LastQueued returns the index of the last queued block and the queue's capacity
// left.
func (bq *Queue) LastQueued() (uint32, int) {
func (bq *Queue[Q]) LastQueued() (uint32, int) {
bq.queueLock.RLock()
defer bq.queueLock.RUnlock()
return bq.lastQ, bq.cacheSize - bq.len
}

// Discard stops the queue and prevents it from accepting more blocks to enqueue.
func (bq *Queue) Discard() {
func (bq *Queue[Q]) Discard() {
if bq.discarded.CompareAndSwap(false, true) {
bq.queueLock.Lock()
close(bq.checkBlocks)
Expand Down

0 comments on commit ef6089a

Please sign in to comment.