Skip to content

Commit

Permalink
*: refactor for generic queue version
Browse files Browse the repository at this point in the history
Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 28, 2025
1 parent ef6089a commit 3f4140f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 26 deletions.
3 changes: 2 additions & 1 deletion pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/address"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
npayload "github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/util"
Expand Down Expand Up @@ -58,7 +59,7 @@ type Ledger interface {

// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
Put(block any) error
Put(block bqueue.Queueable) error
}

// Service represents a consensus instance.
Expand Down
16 changes: 15 additions & 1 deletion pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/encoding/fixedn"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/smartcontract"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/callflag"
"github.com/nspcc-dev/neo-go/pkg/smartcontract/manifest"
Expand Down Expand Up @@ -128,7 +129,7 @@ var (
// adding new blocks or headers.
type Blockchain struct {
HeaderHashes

bqueue.Blockqueuer[bqueue.Queueable]
config config.Blockchain

// The only way chain state changes is by adding blocks, so we can't
Expand Down Expand Up @@ -1608,6 +1609,19 @@ func (bc *Blockchain) GetStateSyncModule() *statesync.Module {
return statesync.NewModule(bc, bc.stateRoot, bc.log, bc.dao, bc.jumpToState)
}

func (bc *Blockchain) AddItem(b bqueue.Queueable) error {
return bc.AddBlock(b.(*block.Block))
}

func (bc *Blockchain) AddItems(hdrs ...bqueue.Queueable) error {
for _, h := range hdrs {
if err := bc.AddHeaders(h.(*block.Header)); err != nil {
return err
}
}
return nil
}

// storeBlock performs chain update using the block given, it executes all
// transactions with all appropriate side-effects and updates Blockchain state.
// This is the only way to change Blockchain state.
Expand Down
30 changes: 27 additions & 3 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/core/stateroot"
"github.com/nspcc-dev/neo-go/pkg/core/storage"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -62,7 +63,7 @@ const (

// Ledger is the interface required from Blockchain for Module to operate.
type Ledger interface {
AddHeaders(...*block.Header) error
bqueue.Blockqueuer[bqueue.Queueable]
BlockHeight() uint32
GetConfig() config.Blockchain
GetHeader(hash util.Uint256) (*block.Header, error)
Expand Down Expand Up @@ -95,6 +96,24 @@ type Module struct {
jumpCallback func(p uint32) error
}

func (s *Module) AddItem(b bqueue.Queueable) error {
return s.AddBlock(b.(*block.Block))

}

func (s *Module) AddItems(hdrs ...bqueue.Queueable) error {
for _, h := range hdrs {
if err := s.AddHeaders(h.(*block.Header)); err != nil {
return err
}
}
return nil
}

func (s *Module) Height() uint32 {
return s.BlockHeight()
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
Expand Down Expand Up @@ -293,8 +312,13 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {
if s.syncStage != initialized {
return errors.New("headers were not requested")
}

hdrsErr := s.bc.AddHeaders(hdrs...)
var hdrsErr error
for _, hdr := range hdrs {
hdrsErr = s.AddItems(hdr)
if hdrsErr != nil {
break
}
}
if s.bc.HeaderHeight() > s.syncPoint {
err := s.defineSyncStage()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Blockqueuer is an interface for a block queue.
type Blockqueuer[Q Queueable] interface {
AddItem(Q Queueable) error
AddItem(Q) error
AddItems(...Q) error
Height() uint32
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (bq *Queue[Q]) Run() {
break
}

err := bq.chain.AddItem(b)
err := bq.chain.AddItem(b.(Q))
if err != nil {
// The block might already be added by the consensus.
if bq.chain.Height() < b.GetIndex() {
Expand Down
78 changes: 62 additions & 16 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type (
Ledger interface {
extpool.Ledger
mempool.Feer
bqueue.Blockqueuer
bqueue.Blockqueuer[bqueue.Queueable]
GetBlock(hash util.Uint256) (*block.Block, error)
GetConfig() config.Blockchain
GetHeader(hash util.Uint256) (*block.Header, error)
Expand Down Expand Up @@ -102,9 +102,9 @@ type (
transports []Transporter
discovery Discoverer
chain Ledger
bQueue *bqueue.Queue
bSyncQueue *bqueue.Queue
bFetcherQueue *bqueue.Queue
bQueue *bqueue.Queue[*block.Block]
bSyncQueue *bqueue.Queue[*block.Block]
bFetcherQueue *bqueue.Queue[*block.Block]
mempool *mempool.Pool
notaryRequestPool *mempool.Pool
extensiblePool *extpool.Pool
Expand Down Expand Up @@ -219,15 +219,13 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
}, s.notaryFeer)
})
}
s.bQueue = bqueue.New(chain, log, func(b *block.Block) {
s.tryStartServices()
}, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks)
s.bQueue = bqueue.New[*block.Block](chainAdapter{chain}, log, func(b *block.Block) { s.tryStartServices() }, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)

s.bSyncQueue = bqueue.New(s.stateSync, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking, bqueue.Blocks)
s.bSyncQueue = bqueue.New[*block.Block](blockAdapter{s.stateSync}, log, nil, bqueue.DefaultCacheSize, updateBlockQueueLenMetric, bqueue.NonBlocking)
if s.NeoFSBlockFetcherCfg.BQueueSize <= 0 {
s.NeoFSBlockFetcherCfg.BQueueSize = blockfetcher.DefaultQueueCacheSize
}
s.bFetcherQueue = bqueue.New(chain, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking, bqueue.Blocks)
s.bFetcherQueue = bqueue.New[*block.Block](chainAdapter{chain}, log, nil, s.NeoFSBlockFetcherCfg.BQueueSize, updateBlockQueueLenMetric, bqueue.Blocking)
var err error
s.blockFetcher, err = blockfetcher.New(chain, s.NeoFSBlockFetcherCfg, log, s.bFetcherQueue.Put, sync.OnceFunc(func() { close(s.blockFetcherFin) }))
if err != nil {
Expand Down Expand Up @@ -281,6 +279,48 @@ func newServerFromConstructors(config ServerConfig, chain Ledger, stSync StateSy
return s, nil
}

type blockAdapter struct {
bc StateSync
}

func (a blockAdapter) AddItem(b *block.Block) error {
return a.bc.AddItem(b)
}

func (a blockAdapter) AddItems(bs ...*block.Block) error {
for _, b := range bs {
if err := a.bc.AddItem(b); err != nil {
return err
}
}
return nil
}

func (a blockAdapter) Height() uint32 {
return a.bc.Height()
}

type chainAdapter struct {
chain Ledger
}

func (a chainAdapter) AddItem(b *block.Block) error {
return a.chain.AddItem(b)
}

func (a chainAdapter) AddItems(bs ...*block.Block) error {
for _, b := range bs {
if err := a.chain.AddItem(b); err != nil {
return err
}
}
return nil
}

func (a chainAdapter) Height() uint32 {
return a.chain.Height()
}

// ID returns the servers ID.
func (s *Server) ID() uint32 {
return s.id
Expand Down Expand Up @@ -373,7 +413,7 @@ func (s *Server) addService(svc Service) {
}

// GetBlockQueue returns the block queue instance managed by Server.
func (s *Server) GetBlockQueue() *bqueue.Queue {
func (s *Server) GetBlockQueue() *bqueue.Queue[*block.Block] {
return s.bQueue
}

Expand Down Expand Up @@ -824,14 +864,14 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error {
return nil
}
var (
bq bqueue.Blockqueuer = s.chain
bq bqueue.Blockqueuer[*block.Block] = chainAdapter{s.chain}
requestMPTNodes bool
)
if s.stateSync.IsActive() {
bq = s.stateSync
bq = blockAdapter{s.stateSync}
requestMPTNodes = s.stateSync.NeedMPTNodes()
}
if bq.BlockHeight() >= p.LastBlockIndex() {
if bq.Height() >= p.LastBlockIndex() {
return nil
}
err := s.requestBlocks(bq, p)
Expand Down Expand Up @@ -1138,7 +1178,13 @@ func (s *Server) handleHeadersCmd(p Peer, h *payload.Headers) error {
if s.blockFetcher.IsActive() {
return nil
}
return s.stateSync.AddHeaders(h.Hdrs...)
for _, header := range h.Hdrs {
err := s.stateSync.AddItems(header)
if err != nil {
return err
}
}
return nil
}

// handleExtensibleCmd processes the received extensible payload.
Expand Down Expand Up @@ -1333,8 +1379,8 @@ func (s *Server) handleGetAddrCmd(p Peer) error {
// 1. Block range is divided into chunks of payload.MaxHashesCount.
// 2. Send requests for chunk in increasing order.
// 3. After all requests have been sent, request random height.
func (s *Server) requestBlocks(bq bqueue.Blockqueuer, p Peer) error {
pl := getRequestBlocksPayload(p, bq.BlockHeight(), &s.lastRequestedBlock)
func (s *Server) requestBlocks(bq bqueue.Blockqueuer[*block.Block], p Peer) error {
pl := getRequestBlocksPayload(p, bq.Height(), &s.lastRequestedBlock)
lq, capLeft := s.bQueue.LastQueued()
if capLeft == 0 {
// No more blocks will fit into the queue.
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// StateSync represents state sync module.
type StateSync interface {
AddMPTNodes([][]byte) error
bqueue.Blockqueuer
bqueue.Blockqueuer[bqueue.Queueable]
Init(currChainHeight uint32) error
IsActive() bool
IsInitialized() bool
Expand Down
5 changes: 3 additions & 2 deletions pkg/services/blockfetcher/blockfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/nspcc-dev/neo-go/pkg/config"
"github.com/nspcc-dev/neo-go/pkg/core/block"
gio "github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/network/bqueue"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
Expand Down Expand Up @@ -59,7 +60,7 @@ type Service struct {

chain Ledger
pool poolWrapper
enqueueBlock func(any) error
enqueueBlock func(block bqueue.Queueable) error
account *wallet.Account

oidsCh chan oid.ID
Expand All @@ -81,7 +82,7 @@ type Service struct {
}

// New creates a new BlockFetcher Service.
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(any) error, shutdownCallback func()) (*Service, error) {
func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, putBlock func(block bqueue.Queueable) error, shutdownCallback func()) (*Service, error) {
var (
account *wallet.Account
err error
Expand Down

0 comments on commit 3f4140f

Please sign in to comment.