From 4004e3814867c6eda096c325440a3cdc76dd342b Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 21 Feb 2025 12:14:19 +0300 Subject: [PATCH 1/3] node/meta: drop senseless comment about deleted containers It has already been implemented via deleted container notifications. Signed-off-by: Pavel Karpy --- pkg/services/meta/notifications.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/services/meta/notifications.go b/pkg/services/meta/notifications.go index 876a5733ce..a252e67805 100644 --- a/pkg/services/meta/notifications.go +++ b/pkg/services/meta/notifications.go @@ -286,8 +286,6 @@ func (m *Meta) handleBlock(ind uint32) error { st.mpt.Flush(ind) } - // TODO drop containers that node does not belong to anymore? - l.Debug("handled block successfully") return nil From 96ef86e466389e174ec63aff2621447f8133d090 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 21 Feb 2025 00:00:17 +0300 Subject: [PATCH 2/3] node/meta: Use neo-go's new notification API Receive object notifications by block and process them in sync, not independently. This is important for a consistent MPT state. Strictly speaking, it was a bug before but it was a bug that was impossible to fix without neo-go's new `getblocknotifications` API available with 0.108.0 release. Closes #3142. Signed-off-by: Pavel Karpy --- pkg/services/meta/meta.go | 56 +++++++----- pkg/services/meta/notifications.go | 111 +++++++++++------------- pkg/services/meta/notifications_test.go | 102 ++++++++++++++-------- 3 files changed, 149 insertions(+), 120 deletions(-) diff --git a/pkg/services/meta/meta.go b/pkg/services/meta/meta.go index 34c06334e6..e776a56f52 100644 --- a/pkg/services/meta/meta.go +++ b/pkg/services/meta/meta.go @@ -9,7 +9,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/block" "github.com/nspcc-dev/neo-go/pkg/core/state" - "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "go.uber.org/zap" @@ -32,6 +33,17 @@ type ContainerLister interface { List() (map[cid.ID]struct{}, error) } +// wsClient is for test purposes only. +type wsClient interface { + GetBlockNotifications(blockHash util.Uint256, filters ...*neorpc.NotificationFilter) (*result.BlockNotifications, error) + GetVersion() (*result.Version, error) + + ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) + ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) + + Close() +} + // Meta handles object meta information received from FS chain and object // storages. Chain information is stored in Merkle-Patricia Tries. Full objects // index is built and stored as a simple KV storage. @@ -47,21 +59,21 @@ type Meta struct { timeout time.Duration magicNumber uint32 - ws *rpcclient.WSClient + cliM sync.RWMutex + ws wsClient bCh chan *block.Header - objEv chan *state.ContainedNotificationEvent cnrDelEv chan *state.ContainedNotificationEvent cnrPutEv chan *state.ContainedNotificationEvent epochEv chan *state.ContainedNotificationEvent - objNotificationBuff chan *state.ContainedNotificationEvent + blockBuff chan *block.Header // runtime reload fields cfgM sync.RWMutex endpoints []string } -const objsBufferSize = 1024 +const blockBuffSize = 1024 // Parameters groups arguments for [New] call. type Parameters struct { @@ -136,20 +148,19 @@ func New(p Parameters) (*Meta, error) { } return &Meta{ - l: p.Logger, - rootPath: p.RootPath, - netmapH: p.NetmapHash, - cnrH: p.ContainerHash, - cLister: p.ContainerLister, - endpoints: p.NeoEnpoints, - timeout: p.Timeout, - bCh: make(chan *block.Header), - objEv: make(chan *state.ContainedNotificationEvent), - cnrDelEv: make(chan *state.ContainedNotificationEvent), - cnrPutEv: make(chan *state.ContainedNotificationEvent), - epochEv: make(chan *state.ContainedNotificationEvent), - objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize), - storages: storages}, nil + l: p.Logger, + rootPath: p.RootPath, + netmapH: p.NetmapHash, + cnrH: p.ContainerHash, + cLister: p.ContainerLister, + endpoints: p.NeoEnpoints, + timeout: p.Timeout, + bCh: make(chan *block.Header), + cnrDelEv: make(chan *state.ContainedNotificationEvent), + cnrPutEv: make(chan *state.ContainedNotificationEvent), + epochEv: make(chan *state.ContainedNotificationEvent), + blockBuff: make(chan *block.Header, blockBuffSize), + storages: storages}, nil } // Reload updates service in runtime. @@ -198,13 +209,16 @@ func (m *Meta) Run(ctx context.Context) error { } go m.flusher(ctx) - go m.objNotificationWorker(ctx, m.objNotificationBuff) + go m.blockFetcher(ctx, m.blockBuff) return m.listenNotifications(ctx) } func (m *Meta) flusher(ctx context.Context) { - const flushInterval = time.Second + const ( + flushInterval = time.Second + collapseDepth = 10 + ) t := time.NewTicker(flushInterval) defer t.Stop() diff --git a/pkg/services/meta/notifications.go b/pkg/services/meta/notifications.go index a252e67805..a13780c3af 100644 --- a/pkg/services/meta/notifications.go +++ b/pkg/services/meta/notifications.go @@ -32,12 +32,6 @@ func (m *Meta) subscribeForMeta() error { return fmt.Errorf("subscribe for block headers: %w", err) } - objEv := objPutEvName - _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &objEv}, m.objEv) - if err != nil { - return fmt.Errorf("subscribe for object notifications: %w", err) - } - cnrDeleteEv := cnrDeleteName _, err = m.ws.ReceiveExecutionNotifications(&neorpc.NotificationFilter{Contract: &m.cnrH, Name: &cnrDeleteEv}, m.cnrDelEv) if err != nil { @@ -72,26 +66,7 @@ func (m *Meta) listenNotifications(ctx context.Context) error { continue } - go func() { - err := m.handleBlock(h.Index) - if err != nil { - m.l.Error(fmt.Sprintf("processing %d block", h.Index), zap.Error(err)) - return - } - }() - case aer, ok := <-m.objEv: - if !ok { - err := m.reconnect(ctx) - if err != nil { - return err - } - - continue - } - - // TODO: https://github.com/nspcc-dev/neo-go/issues/3779 receive somehow notifications from blocks - - m.objNotificationBuff <- aer + m.blockBuff <- h case aer, ok := <-m.cnrDelEv: if !ok { err := m.reconnect(ctx) @@ -192,6 +167,9 @@ func (m *Meta) listenNotifications(ctx context.Context) error { func (m *Meta) reconnect(ctx context.Context) error { m.l.Warn("reconnecting to web socket client due to connection lost") + m.cliM.Lock() + defer m.cliM.Unlock() + var err error m.ws, err = m.connect(ctx) if err != nil { @@ -199,7 +177,6 @@ func (m *Meta) reconnect(ctx context.Context) error { } m.bCh = make(chan *block.Header) - m.objEv = make(chan *state.ContainedNotificationEvent) m.cnrDelEv = make(chan *state.ContainedNotificationEvent) m.cnrPutEv = make(chan *state.ContainedNotificationEvent) m.epochEv = make(chan *state.ContainedNotificationEvent) @@ -252,17 +229,50 @@ outer: return cli, nil } -const ( - collapseDepth = 10 -) - -func (m *Meta) handleBlock(ind uint32) error { - l := m.l.With(zap.Uint32("block", ind)) +func (m *Meta) handleBlock(b *block.Header) error { + h := b.Hash() + ind := b.Index + l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind)) l.Debug("handling block") + evName := objPutEvName + res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{ + Contract: &m.cnrH, + Name: &evName, + }) + if err != nil { + return fmt.Errorf("fetching %s block: %w", h, err) + } + + if len(res.Application) == 0 { + return nil + } + m.m.RLock() defer m.m.RUnlock() + for _, n := range res.Application { + ev, err := parseObjNotification(n) + if err != nil { + l.Error("invalid object notification received", zap.Error(err)) + continue + } + + s, ok := m.storages[ev.cID] + if !ok { + l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID)) + continue + } + + err = m.handleObjectNotification(s, ev) + if err != nil { + l.Error("handling object notification", zap.Error(err)) + continue + } + + l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID)) + } + for _, st := range m.storages { // TODO: parallelize depending on what can parallelize well @@ -291,35 +301,17 @@ func (m *Meta) handleBlock(ind uint32) error { return nil } -func (m *Meta) objNotificationWorker(ctx context.Context, ch <-chan *state.ContainedNotificationEvent) { +func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) { for { select { case <-ctx.Done(): return - case n := <-ch: - l := m.l.With(zap.Stringer("notification container", n.Container)) - - ev, err := parseObjNotification(n) + case b := <-buff: + err := m.handleBlock(b) if err != nil { - l.Error("invalid object notification received", zap.Error(err)) - continue - } - - m.m.RLock() - _, ok := m.storages[ev.cID] - m.m.RUnlock() - if !ok { - l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID)) + m.l.Error("block handling failed", zap.Error(err)) continue } - - err = m.handleObjectNotification(ev) - if err != nil { - l.Error("handling object notification", zap.Error(err)) - return - } - - l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID)) } } } @@ -363,7 +355,7 @@ type objEvent struct { typ objectsdk.Type } -func parseObjNotification(ev *state.ContainedNotificationEvent) (objEvent, error) { +func parseObjNotification(ev state.ContainedNotificationEvent) (objEvent, error) { const expectedNotificationArgs = 3 var res objEvent @@ -485,15 +477,12 @@ func getFromMap(m *stackitem.Map, key string) stackitem.Item { return m.Value().([]stackitem.MapElement)[i].Value } -func (m *Meta) handleObjectNotification(e objEvent) error { +func (m *Meta) handleObjectNotification(s *containerStorage, e objEvent) error { if magic := uint32(e.network.Uint64()); magic != m.magicNumber { return fmt.Errorf("wrong magic number %d, expected: %d", magic, m.magicNumber) } - m.m.RLock() - defer m.m.RUnlock() - - err := m.storages[e.cID].putObject(e) + err := s.putObject(e) if err != nil { return err } diff --git a/pkg/services/meta/notifications_test.go b/pkg/services/meta/notifications_test.go index f3d38097bd..30caffeb8b 100644 --- a/pkg/services/meta/notifications_test.go +++ b/pkg/services/meta/notifications_test.go @@ -7,6 +7,7 @@ import ( "maps" "math/big" "os" + "sync" "testing" "time" @@ -14,6 +15,8 @@ import ( "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/core/storage" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/nspcc-dev/neo-go/pkg/neorpc/result" "github.com/nspcc-dev/neo-go/pkg/util" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -90,18 +93,57 @@ func checkDBFiles(t *testing.T, path string, cnrs map[cid.ID]struct{}) { }, 5*time.Second, time.Millisecond*100, "expected to find db files") } -func createAndRunTestMeta(t *testing.T) (*Meta, func(), chan struct{}) { +type testWS struct { + m sync.RWMutex + notifications []state.ContainedNotificationEvent + err error +} + +func (t *testWS) swapResults(notifications []state.ContainedNotificationEvent, err error) { + t.m.Lock() + defer t.m.Unlock() + + t.notifications = notifications + t.err = err +} + +func (t *testWS) GetBlockNotifications(blockHash util.Uint256, filters ...*neorpc.NotificationFilter) (*result.BlockNotifications, error) { + t.m.RLock() + defer t.m.RUnlock() + + return &result.BlockNotifications{ + Application: t.notifications, + }, t.err +} + +func (t *testWS) GetVersion() (*result.Version, error) { + panic("not expected for now") +} + +func (t *testWS) ReceiveHeadersOfAddedBlocks(flt *neorpc.BlockFilter, rcvr chan<- *block.Header) (string, error) { + panic("not expected for now") +} + +func (t *testWS) ReceiveExecutionNotifications(flt *neorpc.NotificationFilter, rcvr chan<- *state.ContainedNotificationEvent) (string, error) { + panic("not expected for now") +} + +func (t *testWS) Close() { + panic("not expected for now") +} + +func createAndRunTestMeta(t *testing.T, ws wsClient) (*Meta, func(), chan struct{}) { ctx, cancel := context.WithCancel(context.Background()) m := &Meta{ - l: zaptest.NewLogger(t), - rootPath: t.TempDir(), - magicNumber: 102938475, - bCh: make(chan *block.Header), - objEv: make(chan *state.ContainedNotificationEvent), - cnrDelEv: make(chan *state.ContainedNotificationEvent), - cnrPutEv: make(chan *state.ContainedNotificationEvent), - epochEv: make(chan *state.ContainedNotificationEvent), - objNotificationBuff: make(chan *state.ContainedNotificationEvent, objsBufferSize), + l: zaptest.NewLogger(t), + rootPath: t.TempDir(), + magicNumber: 102938475, + bCh: make(chan *block.Header), + cnrDelEv: make(chan *state.ContainedNotificationEvent), + cnrPutEv: make(chan *state.ContainedNotificationEvent), + epochEv: make(chan *state.ContainedNotificationEvent), + blockBuff: make(chan *block.Header, blockBuffSize), + ws: ws, // no-op, to be filled by test cases if needed storages: make(map[cid.ID]*containerStorage), @@ -115,7 +157,7 @@ func createAndRunTestMeta(t *testing.T) (*Meta, func(), chan struct{}) { exitCh := make(chan struct{}) go m.flusher(ctx) - go m.objNotificationWorker(ctx, m.objNotificationBuff) + go m.blockFetcher(ctx, m.blockBuff) go func() { _ = m.listenNotifications(ctx) exitCh <- struct{}{} @@ -221,7 +263,8 @@ func checkObject(t *testing.T, m *Meta, cID cid.ID, oID, firstPart, previousPart } func TestObjectPut(t *testing.T) { - m, stop, exitCh := createAndRunTestMeta(t) + ws := testWS{} + m, stop, exitCh := createAndRunTestMeta(t, &ws) t.Cleanup(func() { stop() <-exitCh @@ -285,12 +328,13 @@ func TestObjectPut(t *testing.T) { } }() - m.objEv <- &state.ContainedNotificationEvent{ + ws.swapResults(append(ws.notifications, state.ContainedNotificationEvent{ NotificationEvent: state.NotificationEvent{ Name: objPutEvName, Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(cID[:]), stackitem.Make(oID[:]), metaStack}), }, - } + }), nil) + m.bCh <- &block.Header{Index: 0} require.Eventually(t, func() bool { return checkObject(t, m, cID, oID, fPart, pPart, size, typ, deleted, nil, testVUB, m.magicNumber) @@ -306,32 +350,13 @@ func TestObjectPut(t *testing.T) { metaStack, err := stackitem.Deserialize(metaRaw) require.NoError(t, err) - stopTest := make(chan struct{}) - t.Cleanup(func() { - close(stopTest) - }) - go func() { - var i uint32 = 1 - tick := time.NewTicker(100 * time.Millisecond) - for { - select { - case <-tick.C: - m.bCh <- &block.Header{ - Index: i, - } - i++ - case <-stopTest: - return - } - } - }() - - m.objEv <- &state.ContainedNotificationEvent{ + ws.swapResults(append(ws.notifications, state.ContainedNotificationEvent{ NotificationEvent: state.NotificationEvent{ Name: objPutEvName, Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(cID[:]), stackitem.Make(objToDeleteOID[:]), metaStack}), }, - } + }), nil) + m.bCh <- &block.Header{Index: 0} require.Eventually(t, func() bool { return checkObject(t, m, cID, objToDeleteOID, oid.ID{}, oid.ID{}, size, objectsdk.TypeRegular, nil, nil, testVUB, m.magicNumber) @@ -346,12 +371,13 @@ func TestObjectPut(t *testing.T) { metaStack, err = stackitem.Deserialize(metaRaw) require.NoError(t, err) - m.objEv <- &state.ContainedNotificationEvent{ + ws.swapResults(append(ws.notifications, state.ContainedNotificationEvent{ NotificationEvent: state.NotificationEvent{ Name: objPutEvName, Item: stackitem.NewArray([]stackitem.Item{stackitem.Make(tsCID[:]), stackitem.Make(tsOID[:]), metaStack}), }, - } + }), nil) + m.bCh <- &block.Header{Index: 0} require.Eventually(t, func() bool { m.m.RLock() From f85af2f21ef43ff1090f3f3f512761e210c22019 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 21 Feb 2025 12:07:45 +0300 Subject: [PATCH 3/3] node/meta: move block related things in a separate file Also rename `container.go` to `containers.go` since we already have `blocks.go` and `notifications.go` with the same meaning. Signed-off-by: Pavel Karpy --- pkg/services/meta/blocks.go | 101 ++++++++++++++++++ .../meta/{container.go => containers.go} | 0 pkg/services/meta/notifications.go | 88 --------------- 3 files changed, 101 insertions(+), 88 deletions(-) create mode 100644 pkg/services/meta/blocks.go rename pkg/services/meta/{container.go => containers.go} (100%) diff --git a/pkg/services/meta/blocks.go b/pkg/services/meta/blocks.go new file mode 100644 index 0000000000..8169589f35 --- /dev/null +++ b/pkg/services/meta/blocks.go @@ -0,0 +1,101 @@ +package meta + +import ( + "context" + "fmt" + + "github.com/nspcc-dev/neo-go/pkg/core/block" + "github.com/nspcc-dev/neo-go/pkg/core/mpt" + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "go.uber.org/zap" +) + +func (m *Meta) handleBlock(b *block.Header) error { + h := b.Hash() + ind := b.Index + l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind)) + l.Debug("handling block") + + evName := objPutEvName + m.cliM.RLock() + res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{ + Contract: &m.cnrH, + Name: &evName, + }) + if err != nil { + m.cliM.RUnlock() + return fmt.Errorf("fetching %s block: %w", h, err) + } + m.cliM.RUnlock() + + if len(res.Application) == 0 { + return nil + } + + m.m.RLock() + defer m.m.RUnlock() + + for _, n := range res.Application { + ev, err := parseObjNotification(n) + if err != nil { + l.Error("invalid object notification received", zap.Error(err)) + continue + } + + s, ok := m.storages[ev.cID] + if !ok { + l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID)) + continue + } + + err = m.handleObjectNotification(s, ev) + if err != nil { + l.Error("handling object notification", zap.Error(err)) + continue + } + + l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID)) + } + + for _, st := range m.storages { + // TODO: parallelize depending on what can parallelize well + + st.m.Lock() + + root := st.mpt.StateRoot() + st.mpt.Store.Put([]byte{rootKey}, root[:]) + p := st.path + if st.opsBatch != nil { + _, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch)) + if err != nil { + st.m.Unlock() + return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err) + } + + st.opsBatch = nil + } + + st.m.Unlock() + + st.mpt.Flush(ind) + } + + l.Debug("handled block successfully") + + return nil +} + +func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) { + for { + select { + case <-ctx.Done(): + return + case b := <-buff: + err := m.handleBlock(b) + if err != nil { + m.l.Error("block handling failed", zap.Error(err)) + continue + } + } + } +} diff --git a/pkg/services/meta/container.go b/pkg/services/meta/containers.go similarity index 100% rename from pkg/services/meta/container.go rename to pkg/services/meta/containers.go diff --git a/pkg/services/meta/notifications.go b/pkg/services/meta/notifications.go index a13780c3af..c20fb4960b 100644 --- a/pkg/services/meta/notifications.go +++ b/pkg/services/meta/notifications.go @@ -8,7 +8,6 @@ import ( "time" "github.com/nspcc-dev/neo-go/pkg/core/block" - "github.com/nspcc-dev/neo-go/pkg/core/mpt" "github.com/nspcc-dev/neo-go/pkg/core/state" "github.com/nspcc-dev/neo-go/pkg/neorpc" "github.com/nspcc-dev/neo-go/pkg/rpcclient" @@ -229,93 +228,6 @@ outer: return cli, nil } -func (m *Meta) handleBlock(b *block.Header) error { - h := b.Hash() - ind := b.Index - l := m.l.With(zap.Stringer("block hash", h), zap.Uint32("index", ind)) - l.Debug("handling block") - - evName := objPutEvName - res, err := m.ws.GetBlockNotifications(h, &neorpc.NotificationFilter{ - Contract: &m.cnrH, - Name: &evName, - }) - if err != nil { - return fmt.Errorf("fetching %s block: %w", h, err) - } - - if len(res.Application) == 0 { - return nil - } - - m.m.RLock() - defer m.m.RUnlock() - - for _, n := range res.Application { - ev, err := parseObjNotification(n) - if err != nil { - l.Error("invalid object notification received", zap.Error(err)) - continue - } - - s, ok := m.storages[ev.cID] - if !ok { - l.Debug("skipping object notification", zap.Stringer("inactual container", ev.cID)) - continue - } - - err = m.handleObjectNotification(s, ev) - if err != nil { - l.Error("handling object notification", zap.Error(err)) - continue - } - - l.Debug("handled object notification successfully", zap.Stringer("cID", ev.cID), zap.Stringer("oID", ev.oID)) - } - - for _, st := range m.storages { - // TODO: parallelize depending on what can parallelize well - - st.m.Lock() - - root := st.mpt.StateRoot() - st.mpt.Store.Put([]byte{rootKey}, root[:]) - p := st.path - if st.opsBatch != nil { - _, err := st.mpt.PutBatch(mpt.MapToMPTBatch(st.opsBatch)) - if err != nil { - st.m.Unlock() - return fmt.Errorf("put batch for %d block to %q storage: %w", ind, p, err) - } - - st.opsBatch = nil - } - - st.m.Unlock() - - st.mpt.Flush(ind) - } - - l.Debug("handled block successfully") - - return nil -} - -func (m *Meta) blockFetcher(ctx context.Context, buff <-chan *block.Header) { - for { - select { - case <-ctx.Done(): - return - case b := <-buff: - err := m.handleBlock(b) - if err != nil { - m.l.Error("block handling failed", zap.Error(err)) - continue - } - } - } -} - const ( // MPT key prefixes. oidIndex = iota