Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Headers fetching via NeoFS BlockFetcher service #3789

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type FakeStateSync struct {
AddMPTNodesFunc func(nodes [][]byte) error
}

// HeaderHeight returns the height of the latest stored header.
func (s *FakeStateSync) HeaderHeight() uint32 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exported method needs a comment.

return 0
}

// NewFakeChain returns a new FakeChain structure.
func NewFakeChain() *FakeChain {
return NewFakeChainWithCustomCfg(nil)
Expand Down Expand Up @@ -447,6 +452,9 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
// NeedHeaders implements the StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }

// NeedBlocks implements the StateSync interface.
func (s *FakeStateSync) NeedBlocks() bool { return false }

// NeedMPTNodes implements the StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool {
panic("TODO")
Expand All @@ -464,3 +472,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}

// SetOnStageChanged implements the StateSync interface.
func (s *FakeStateSync) SetOnStageChanged(func()) {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 2 additions & 0 deletions pkg/config/protocol_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// NeoFSStateSyncExtensions enables state data exchange logic via NeoFS.
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`

Expand Down
4 changes: 2 additions & 2 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Ledger interface {

// BlockQueuer is an interface to the block queue manager sufficient for Service.
type BlockQueuer interface {
PutBlock(block *coreb.Block) error
Put(queueable *coreb.Block) error
}

// Service represents a consensus instance.
Expand Down Expand Up @@ -623,7 +623,7 @@ func (s *service) processBlock(b dbft.Block[util.Uint256]) error {
bb := &b.(*neoBlock).Block
bb.Script = *(s.getBlockWitness(bb))

if err := s.BlockQueue.PutBlock(bb); err != nil {
if err := s.BlockQueue.Put(bb); err != nil {
// The block might already be added via the regular network
// interaction.
if _, errget := s.Chain.GetBlock(bb.Hash()); errget != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ type testBlockQueuer struct {
var _ = BlockQueuer(testBlockQueuer{})

// PutBlock implements BlockQueuer interface.
func (bq testBlockQueuer) PutBlock(b *coreb.Block) error {
func (bq testBlockQueuer) Put(b *coreb.Block) error {
return bq.bc.AddBlock(b)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type auxBlockIn struct {
Transactions []json.RawMessage `json:"tx"`
}

// GetIndex returns the index of the block.
func (b *Block) GetIndex() uint32 {
return b.Index
}

// ComputeMerkleRoot computes Merkle tree root hash based on actual block's data.
func (b *Block) ComputeMerkleRoot() util.Uint256 {
hashes := make([]util.Uint256, len(b.Transactions))
Expand Down
15 changes: 15 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,21 @@
Witnesses []transaction.Witness `json:"witnesses"`
}

// GetIndex returns the index of the block.
func (b *Header) GetIndex() uint32 {
return b.Index

Check warning on line 85 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

Comment on lines +83 to +87
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to avoid that, but generics in Go don't support structural types for now, ref. golang/go#51259. We'll probably need Roman's ACK for these additional methods over Header, but for now let's keep this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roman-khimov what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly the same problem in SDK/node interactions. This should be documented appropriately to be used for interfaces only, but I think it's acceptable if we really save some code on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And don't forget to create an issue for further removal of this method if one day golang/go#51259 will be resolved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Header) GetExpectedHeaderSize() int {
size := expectedHeaderSizeWithEmptyWitness - 1 - 1 + // 1 is for the zero-length (new(Header)).Script.Invocation/Verification
io.GetVarSize(&b.Script)
if b.StateRootEnabled {
size += util.Uint256Size
}

Check warning on line 94 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L93-L94

Added lines #L93 - L94 were not covered by tests
return size
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down
16 changes: 16 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@
log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value",
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions {
return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously")
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
Expand All @@ -297,6 +300,19 @@
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.NeoFSStateSyncExtensions {
if !cfg.NeoFSBlockFetcher.Enabled {

Check warning on line 304 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L303-L304

Added lines #L303 - L304 were not covered by tests
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("NeoFSStateSyncExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
}
if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
}
Expand Down
69 changes: 67 additions & 2 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@
billet *mpt.Billet

jumpCallback func(p uint32) error

// stageChangedCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageChangedCallback func()
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
if !bc.GetConfig().Ledger.RemoveUntraceableBlocks || (!bc.GetConfig().P2PStateExchangeExtensions && !bc.GetConfig().NeoFSStateSyncExtensions) {
return &Module{
dao: s,
bc: bc,
Expand All @@ -120,9 +124,14 @@
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage != none {
return errors.New("already initialized or inactive")
}
Expand Down Expand Up @@ -176,6 +185,20 @@
return s.defineSyncStage()
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
func (s *Module) SetOnStageChanged(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()
s.stageChangedCallback = cb

Check warning on line 192 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L189-L192

Added lines #L189 - L192 were not covered by tests
}

// notifyStageChanged triggers stage change callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageChangedCallback != nil {
s.stageChangedCallback()
}

Check warning on line 199 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L198-L199

Added lines #L198 - L199 were not covered by tests
}

// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
Expand All @@ -192,6 +215,12 @@
// defineSyncStage sequentially checks and sets sync state process stage after Module
// initialization. It also performs initialization of MPT Billet if necessary.
func (s *Module) defineSyncStage() error {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
// check headers sync stage first
ltstHeaderHeight := s.bc.HeaderHeight()
if ltstHeaderHeight > s.syncPoint {
Expand Down Expand Up @@ -306,7 +335,13 @@

// AddBlock verifies and saves block skipping executable scripts.
func (s *Module) AddBlock(block *block.Block) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&blocksSynced != 0 {
Expand Down Expand Up @@ -359,7 +394,13 @@
// AddMPTNodes tries to add provided set of MPT nodes to the MPT billet if they are
// not yet collected.
func (s *Module) AddMPTNodes(nodes [][]byte) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
Expand Down Expand Up @@ -425,6 +466,12 @@
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
// should take care of it.
func (s *Module) checkSyncIsCompleted() {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
if s.syncStage != headersSynced|mptSynced|blocksSynced {
return
}
Expand Down Expand Up @@ -484,6 +531,14 @@
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
}

// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
func (s *Module) NeedBlocks() bool {
s.lock.RLock()
defer s.lock.RUnlock()

return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0

Check warning on line 539 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L535-L539

Added lines #L535 - L539 were not covered by tests
}

// Traverse traverses local MPT nodes starting from the specified root down to its
// children calling `process` for each serialised node until stop condition is satisfied.
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
Expand All @@ -508,3 +563,13 @@

return s.mptpool.GetBatch(limit)
}

// HeaderHeight returns the height of the latest stored header.
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()

Check warning on line 574 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L573-L574

Added lines #L573 - L574 were not covered by tests
}
Loading
Loading