Skip to content

Commit

Permalink
node/meta: Use neo-go's new notification API (#3164)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Feb 21, 2025
2 parents 69e69fb + f85af2f commit 8934873
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 167 deletions.
101 changes: 101 additions & 0 deletions pkg/services/meta/blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package meta

import (
"context"
"fmt"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"go.uber.org/zap"
)

func (m *Meta) handleBlock(b *block.Header) error {
h := b.Hash()
ind := b.Index
l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind))
l.Debug("handling block")

evName := objPutEvName
m.cliM.RLock()
res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{
Contract: &m.cnrH,
Name: &evName,
})
if err != nil {
m.cliM.RUnlock()
return fmt.Errorf("fetching %s block: %w", h, err)
}
m.cliM.RUnlock()

if len(res.Application) == 0 {
return nil
}

m.m.RLock()
defer m.m.RUnlock()

for _, n := range res.Application {
ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue
}

s, ok := m.storages[ev.cID]
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue
}

err = m.handleObjectNotification(s, ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
continue
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

l.Debug("handled block successfully")

return nil
}

func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) {
for {
select {
case <-ctx.Done():
return
case b := <-buff:
err := m.handleBlock(b)
if err != nil {
m.l.Error("block handling failed", zap.Error(err))
continue
}
}
}
}
File renamed without changes.
56 changes: 35 additions & 21 deletions pkg/services/meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import (

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"go.uber.org/zap"
Expand All @@ -32,6 +33,17 @@ type ContainerLister interface {
List() (map[cid.ID]struct{}, error)
}

// wsClient is for test purposes only.
type wsClient interface {
GetBlockNotifications(blockHash util.Uint256, filters ...*neorpc.NotificationFilter) (*result.BlockNotifications, error)
GetVersion() (*result.Version, error)

ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error)
ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error)

Close()
}

// Meta handles object meta information received from FS chain and object
// storages. Chain information is stored in Merkle-Patricia Tries. Full objects
// index is built and stored as a simple KV storage.
Expand All @@ -47,21 +59,21 @@ type Meta struct {

timeout time.Duration
magicNumber uint32
ws *rpcclient.WSClient
cliM sync.RWMutex
ws wsClient
bCh chan *block.Header
objEv chan *state.ContainedNotificationEvent
cnrDelEv chan *state.ContainedNotificationEvent
cnrPutEv chan *state.ContainedNotificationEvent
epochEv chan *state.ContainedNotificationEvent

objNotificationBuff chan *state.ContainedNotificationEvent
blockBuff chan *block.Header

// runtime reload fields
cfgM sync.RWMutex
endpoints []string
}

const objsBufferSize = 1024
const blockBuffSize = 1024

// Parameters groups arguments for [New] call.
type Parameters struct {
Expand Down Expand Up @@ -136,20 +148,19 @@ func New(p Parameters) (*Meta, error) {
}

return &Meta{
l: p.Logger,
rootPath: p.RootPath,
netmapH: p.NetmapHash,
cnrH: p.ContainerHash,
cLister: p.ContainerLister,
endpoints: p.NeoEnpoints,
timeout: p.Timeout,
bCh: make(chan *block.Header),
objEv: make(chan *state.ContainedNotificationEvent),
cnrDelEv: make(chan *state.ContainedNotificationEvent),
cnrPutEv: make(chan *state.ContainedNotificationEvent),
epochEv: make(chan *state.ContainedNotificationEvent),
objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize),
storages: storages}, nil
l: p.Logger,
rootPath: p.RootPath,
netmapH: p.NetmapHash,
cnrH: p.ContainerHash,
cLister: p.ContainerLister,
endpoints: p.NeoEnpoints,
timeout: p.Timeout,
bCh: make(chan *block.Header),
cnrDelEv: make(chan *state.ContainedNotificationEvent),
cnrPutEv: make(chan *state.ContainedNotificationEvent),
epochEv: make(chan *state.ContainedNotificationEvent),
blockBuff: make(chan *block.Header, blockBuffSize),
storages: storages}, nil
}

// Reload updates service in runtime.
Expand Down Expand Up @@ -198,13 +209,16 @@ func (m *Meta) Run(ctx context.Context) error {
}

go m.flusher(ctx)
go m.objNotificationWorker(ctx, m.objNotificationBuff)
go m.blockFetcher(ctx, m.blockBuff)

return m.listenNotifications(ctx)
}

func (m *Meta) flusher(ctx context.Context) {
const flushInterval = time.Second
const (
flushInterval = time.Second
collapseDepth = 10
)

t := time.NewTicker(flushInterval)
defer t.Stop()
Expand Down
115 changes: 7 additions & 108 deletions pkg/services/meta/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/core/mpt"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
Expand All @@ -32,12 +31,6 @@ func (m *Meta) subscribeForMeta() error {
return fmt.Errorf("subscribe for block headers: %w", err)
}

objEv := objPutEvName
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &objEv}, m.objEv)
if err != nil {
return fmt.Errorf("subscribe for object notifications: %w", err)
}

cnrDeleteEv := cnrDeleteName
_, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv)
if err != nil {
Expand Down Expand Up @@ -72,26 +65,7 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
continue
}

go func() {
err := m.handleBlock(h.Index)
if err != nil {
m.l.Error(fmt.Sprintf("processing %d block", h.Index), zap.Error(err))
return
}
}()
case aer, ok := <-m.objEv:
if !ok {
err := m.reconnect(ctx)
if err != nil {
return err
}

continue
}

// TODO: https://github.com/nspcc-dev/neo-go/issues/3779 receive somehow notifications from blocks

m.objNotificationBuff <- aer
m.blockBuff <- h
case aer, ok := <-m.cnrDelEv:
if !ok {
err := m.reconnect(ctx)
Expand Down Expand Up @@ -192,14 +166,16 @@ func (m *Meta) listenNotifications(ctx context.Context) error {
func (m *Meta) reconnect(ctx context.Context) error {
m.l.Warn("reconnecting to web socket client due to connection lost")

m.cliM.Lock()
defer m.cliM.Unlock()

var err error
m.ws, err = m.connect(ctx)
if err != nil {
return fmt.Errorf("reconnecting to web socket: %w", err)
}

m.bCh = make(chan *block.Header)
m.objEv = make(chan *state.ContainedNotificationEvent)
m.cnrDelEv = make(chan *state.ContainedNotificationEvent)
m.cnrPutEv = make(chan *state.ContainedNotificationEvent)
m.epochEv = make(chan *state.ContainedNotificationEvent)
Expand Down Expand Up @@ -252,80 +228,6 @@ outer:
return cli, nil
}

const (
collapseDepth = 10
)

func (m *Meta) handleBlock(ind uint32) error {
l := m.l.With(zap.Uint32("block", ind))
l.Debug("handling block")

m.m.RLock()
defer m.m.RUnlock()

for _, st := range m.storages {
// TODO: parallelize depending on what can parallelize well

st.m.Lock()

root := st.mpt.StateRoot()
st.mpt.Store.Put([]byte{rootKey}, root[:])
p := st.path
if st.opsBatch != nil {
_, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch))
if err != nil {
st.m.Unlock()
return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err)
}

st.opsBatch = nil
}

st.m.Unlock()

st.mpt.Flush(ind)
}

// TODO drop containers that node does not belong to anymore?

l.Debug("handled block successfully")

return nil
}

func (m *Meta) objNotificationWorker(ctx context.Context, ch <-chan *state.ContainedNotificationEvent) {
for {
select {
case <-ctx.Done():
return
case n := <-ch:
l := m.l.With(zap.Stringer("notification container", n.Container))

ev, err := parseObjNotification(n)
if err != nil {
l.Error("invalid object notification received", zap.Error(err))
continue
}

m.m.RLock()
_, ok := m.storages[ev.cID]
m.m.RUnlock()
if !ok {
l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID))
continue
}

err = m.handleObjectNotification(ev)
if err != nil {
l.Error("handling object notification", zap.Error(err))
return
}

l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID))
}
}
}

const (
// MPT key prefixes.
oidIndex = iota
Expand Down Expand Up @@ -365,7 +267,7 @@ type objEvent struct {
typ objectsdk.Type
}

func parseObjNotification(ev *state.ContainedNotificationEvent) (objEvent, error) {
func parseObjNotification(ev state.ContainedNotificationEvent) (objEvent, error) {
const expectedNotificationArgs = 3
var res objEvent

Expand Down Expand Up @@ -487,15 +389,12 @@ func getFromMap(m *stackitem.Map, key string) stackitem.Item {
return m.Value().([]stackitem.MapElement)[i].Value
}

func (m *Meta) handleObjectNotification(e objEvent) error {
func (m *Meta) handleObjectNotification(s *containerStorage, e objEvent) error {
if magic := uint32(e.network.Uint64()); magic != m.magicNumber {
return fmt.Errorf("wrong magic number %d, expected: %d", magic, m.magicNumber)
}

m.m.RLock()
defer m.m.RUnlock()

err := m.storages[e.cID].putObject(e)
err := s.putObject(e)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8934873

Please sign in to comment.