Skip to content

Commit

Permalink
network: integrate state sync module with blockfetcher
Browse files Browse the repository at this point in the history
Close #3574

Signed-off-by: Ekaterina Pavlova <[email protected]>
  • Loading branch information
AliceInHunterland committed Jan 29, 2025
1 parent d840b05 commit 646625b
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 47 deletions.
21 changes: 17 additions & 4 deletions internal/fakechain/fakechain.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ type FakeStateSync struct {
AddMPTNodesFunc func(nodes [][]byte) error
}

func (s *FakeStateSync) HeaderHeight() uint32 {
return s.HeaderHeight()
}

// NewFakeChain returns a new FakeChain structure.
func NewFakeChain() *FakeChain {
return NewFakeChainWithCustomCfg(nil)
Expand Down Expand Up @@ -432,6 +428,11 @@ func (s *FakeStateSync) BlockHeight() uint32 {
return 0
}

// HeaderHeight implements the StateSync interface.
func (s *FakeStateSync) HeaderHeight() uint32 {
return 0
}

// IsActive implements the StateSync interface.
func (s *FakeStateSync) IsActive() bool { return s.IsActiveFlag.Load() }

Expand All @@ -451,6 +452,8 @@ func (s *FakeStateSync) Init(currChainHeight uint32) error {
// NeedHeaders implements the StateSync interface.
func (s *FakeStateSync) NeedHeaders() bool { return s.RequestHeaders.Load() }

func (s *FakeStateSync) NeedBlocks() bool { return false }

// NeedMPTNodes implements the StateSync interface.
func (s *FakeStateSync) NeedMPTNodes() bool {
panic("TODO")
Expand All @@ -468,3 +471,13 @@ func (s *FakeStateSync) Traverse(root util.Uint256, process func(node mpt.Node,
func (s *FakeStateSync) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
panic("TODO")
}

// GetConfig implements the StateSync interface.
func (s *FakeStateSync) GetConfig() config.Blockchain {
panic("TODO")
}

// SetOnStageChanged implements the StateSync interface.
func (s *FakeStateSync) SetOnStageChanged(func()) {
panic("TODO")
}
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (c Config) Blockchain() Blockchain {
return Blockchain{
ProtocolConfiguration: c.ProtocolConfiguration,
Ledger: c.ApplicationConfiguration.Ledger,
NeoFSBlockFetcher: c.ApplicationConfiguration.NeoFSBlockFetcher,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/config/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Ledger struct {
type Blockchain struct {
ProtocolConfiguration
Ledger
NeoFSBlockFetcher
}
2 changes: 2 additions & 0 deletions pkg/config/protocol_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type (
P2PSigExtensions bool `yaml:"P2PSigExtensions"`
// P2PStateExchangeExtensions enables additional P2P MPT state data exchange logic.
P2PStateExchangeExtensions bool `yaml:"P2PStateExchangeExtensions"`
// NeoFSStateSyncExtensions enables state data exchange logic via NeoFS.
NeoFSStateSyncExtensions bool `yaml:"NeoFSStateSyncExtensions"`
// ReservedAttributes allows to have reserved attributes range for experimental or private purposes.
ReservedAttributes bool `yaml:"ReservedAttributes"`

Expand Down
10 changes: 10 additions & 0 deletions pkg/core/block/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func (b *Header) GetIndex() uint32 {
return b.Index

Check warning on line 85 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L84-L85

Added lines #L84 - L85 were not covered by tests
}

// GetExpectedHeaderSize returns the expected header size with empty witness.
func (b *Header) GetExpectedHeaderSize() int {
size := expectedHeaderSizeWithEmptyWitness - 1 - 1 + // 1 is for the zero-length (new(Header)).Script.Invocation/Verification
io.GetVarSize(&b.Script)
if b.StateRootEnabled {
size += util.Uint256Size
}

Check warning on line 94 in pkg/core/block/header.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/block/header.go#L93-L94

Added lines #L93 - L94 were not covered by tests
return size
}

// Hash returns the hash of the block. Notice that it is cached internally,
// so no matter how you change the [Header] after the first invocation of this
// method it won't change. To get an updated hash in case you're changing
Expand Down
16 changes: 16 additions & 0 deletions pkg/core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
log.Info("MaxValidUntilBlockIncrement is not set or wrong, using default value",
zap.Uint32("MaxValidUntilBlockIncrement", cfg.MaxValidUntilBlockIncrement))
}
if cfg.P2PStateExchangeExtensions && cfg.NeoFSStateSyncExtensions {
return nil, errors.New("P2PStateExchangeExtensions and NeoFSStateSyncExtensions cannot be enabled simultaneously")
}
if cfg.P2PStateExchangeExtensions {
if !cfg.StateRootInHeader {
return nil, errors.New("P2PStatesExchangeExtensions are enabled, but StateRootInHeader is off")
Expand All @@ -298,6 +301,19 @@ func NewBlockchain(s storage.Store, cfg config.Blockchain, log *zap.Logger) (*Bl
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.NeoFSStateSyncExtensions {
if !cfg.NeoFSBlockFetcher.Enabled {

Check warning on line 305 in pkg/core/blockchain.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/blockchain.go#L304-L305

Added lines #L304 - L305 were not covered by tests
return nil, errors.New("NeoFSStateSyncExtensions are enabled, but NeoFSBlockFetcher is off")
}
if cfg.KeepOnlyLatestState && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("NeoFSStateSyncExtensions can be enabled either on MPT-complete node (KeepOnlyLatestState=false) or on light GC-enabled node (RemoveUntraceableBlocks=true)")
}
if cfg.StateSyncInterval <= 0 {
cfg.StateSyncInterval = defaultStateSyncInterval
log.Info("StateSyncInterval is not set or wrong, using default value",
zap.Int("StateSyncInterval", cfg.StateSyncInterval))
}
}
if cfg.RemoveUntraceableHeaders && !cfg.RemoveUntraceableBlocks {
return nil, errors.New("RemoveUntraceableHeaders is enabled, but RemoveUntraceableBlocks is not")
}
Expand Down
64 changes: 62 additions & 2 deletions pkg/core/statesync/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ type Module struct {
billet *mpt.Billet

jumpCallback func(p uint32) error

// stageChangedCallback is an optional callback that is triggered whenever
// the sync stage changes.
stageChangedCallback func()
}

// NewModule returns new instance of statesync module.
func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Simple, jumpCallback func(p uint32) error) *Module {
if !(bc.GetConfig().P2PStateExchangeExtensions && bc.GetConfig().Ledger.RemoveUntraceableBlocks) {
if !bc.GetConfig().Ledger.RemoveUntraceableBlocks || (!bc.GetConfig().P2PStateExchangeExtensions && !bc.GetConfig().NeoFSStateSyncExtensions) {
return &Module{
dao: s,
bc: bc,
Expand All @@ -120,9 +124,14 @@ func NewModule(bc Ledger, stateMod *stateroot.Module, log *zap.Logger, s *dao.Si
// Init initializes state sync module for the current chain's height with given
// callback for MPT nodes requests.
func (s *Module) Init(currChainHeight uint32) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage != none {
return errors.New("already initialized or inactive")
}
Expand Down Expand Up @@ -176,6 +185,20 @@ func (s *Module) Init(currChainHeight uint32) error {
return s.defineSyncStage()
}

// SetOnStageChanged sets callback that is triggered whenever the sync stage changes.
func (s *Module) SetOnStageChanged(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()
s.stageChangedCallback = cb

Check warning on line 192 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L189-L192

Added lines #L189 - L192 were not covered by tests
}

// notifyStageChanged triggers stage change callback if it's set.
func (s *Module) notifyStageChanged() {
if s.stageChangedCallback != nil {
s.stageChangedCallback()
}

Check warning on line 199 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L198-L199

Added lines #L198 - L199 were not covered by tests
}

// TemporaryPrefix accepts current storage prefix and returns prefix
// to use for storing intermediate items during synchronization.
func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
Expand All @@ -192,6 +215,12 @@ func TemporaryPrefix(currPrefix storage.KeyPrefix) storage.KeyPrefix {
// defineSyncStage sequentially checks and sets sync state process stage after Module
// initialization. It also performs initialization of MPT Billet if necessary.
func (s *Module) defineSyncStage() error {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
// check headers sync stage first
ltstHeaderHeight := s.bc.HeaderHeight()
if ltstHeaderHeight > s.syncPoint {
Expand Down Expand Up @@ -305,7 +334,13 @@ func (s *Module) AddHeaders(hdrs ...*block.Header) error {

// AddBlock verifies and saves block skipping executable scripts.
func (s *Module) AddBlock(block *block.Block) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&blocksSynced != 0 {
Expand Down Expand Up @@ -358,7 +393,13 @@ func (s *Module) AddBlock(block *block.Block) error {
// AddMPTNodes tries to add provided set of MPT nodes to the MPT billet if they are
// not yet collected.
func (s *Module) AddMPTNodes(nodes [][]byte) error {
oldStage := s.syncStage
s.lock.Lock()
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
defer s.lock.Unlock()

if s.syncStage&headersSynced == 0 || s.syncStage&mptSynced != 0 {
Expand Down Expand Up @@ -424,6 +465,12 @@ func (s *Module) restoreNode(n mpt.Node) error {
// If so, then jumping to P state sync point occurs. It is not protected by lock, thus caller
// should take care of it.
func (s *Module) checkSyncIsCompleted() {
oldStage := s.syncStage
defer func() {
if s.syncStage != oldStage {
s.notifyStageChanged()
}
}()
if s.syncStage != headersSynced|mptSynced|blocksSynced {
return
}
Expand Down Expand Up @@ -483,6 +530,14 @@ func (s *Module) NeedMPTNodes() bool {
return s.syncStage&headersSynced != 0 && s.syncStage&mptSynced == 0
}

// NeedBlocks returns whether the module hasn't completed blocks synchronisation.
func (s *Module) NeedBlocks() bool {
s.lock.RLock()
defer s.lock.RUnlock()

return s.syncStage&headersSynced != 0 && s.syncStage&blocksSynced == 0

Check warning on line 538 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L534-L538

Added lines #L534 - L538 were not covered by tests
}

// Traverse traverses local MPT nodes starting from the specified root down to its
// children calling `process` for each serialised node until stop condition is satisfied.
func (s *Module) Traverse(root util.Uint256, process func(node mpt.Node, nodeBytes []byte) bool) error {
Expand Down Expand Up @@ -512,3 +567,8 @@ func (s *Module) GetUnknownMPTNodesBatch(limit int) []util.Uint256 {
func (s *Module) HeaderHeight() uint32 {
return s.bc.HeaderHeight()
}

// GetConfig returns current blockchain configuration.
func (s *Module) GetConfig() config.Blockchain {
return s.bc.GetConfig()

Check warning on line 573 in pkg/core/statesync/module.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/statesync/module.go#L572-L573

Added lines #L572 - L573 were not covered by tests
}
Loading

0 comments on commit 646625b

Please sign in to comment.