From b1073bb367ae0bcd52d5e0710cfd9e85b9f7bd0e Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 31 Jan 2025 06:12:29 +0300 Subject: [PATCH] node/meta: catch meta notifications from chain Store caught information in MPT structures. Closes #3070. Signed-off-by: Pavel Karpy --- cmd/neofs-node/config.go | 15 ++ .../config/internal/validate/config.go | 4 + cmd/neofs-node/config/meta/config.go | 17 +++ cmd/neofs-node/config/meta/config_test.go | 29 ++++ cmd/neofs-node/main.go | 1 + cmd/neofs-node/meta.go | 135 ++++++++++++++++++ config/example/node.env | 3 + config/example/node.json | 3 + config/example/node.yaml | 3 + docs/storage-node-configuration.md | 12 ++ 10 files changed, 222 insertions(+) create mode 100644 cmd/neofs-node/config/meta/config.go create mode 100644 cmd/neofs-node/config/meta/config_test.go create mode 100644 cmd/neofs-node/meta.go diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index 8bfc1a7d4f..45e2c6eff7 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -26,6 +26,7 @@ import ( fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree" fschainconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/fschain" loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger" + metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta" nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node" objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object" policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer" @@ -46,6 +47,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/network/cache" "github.com/nspcc-dev/neofs-node/pkg/services/control" controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server" + "github.com/nspcc-dev/neofs-node/pkg/services/meta" getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" "github.com/nspcc-dev/neofs-node/pkg/services/policer" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" @@ -94,6 +96,10 @@ type applicationConfiguration struct { timestamp bool } + metadata struct { + path string + } + engine struct { errorThreshold uint32 shardPoolSize uint32 @@ -159,6 +165,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error { a.policer.replicationCooldown = policerconfig.ReplicationCooldown(c) a.policer.objectBatchSize = policerconfig.ObjectBatchSize(c) + // Meta data + + a.metadata.path = metaconfig.Path(c) + // Storage Engine a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c) @@ -396,6 +406,7 @@ type cfg struct { // configuration of the internal // services cfgGRPC cfgGRPC + cfgMeta cfgMeta cfgMorph cfgMorph cfgContainer cfgContainer cfgNodeInfo cfgNodeInfo @@ -427,6 +438,10 @@ type cfgGRPC struct { servers []*grpc.Server } +type cfgMeta struct { + cLister meta.ContainerLister +} + type cfgMorph struct { client *client.Client diff --git a/cmd/neofs-node/config/internal/validate/config.go b/cmd/neofs-node/config/internal/validate/config.go index f01b1b2590..a53edd34c7 100644 --- a/cmd/neofs-node/config/internal/validate/config.go +++ b/cmd/neofs-node/config/internal/validate/config.go @@ -23,6 +23,10 @@ type valideConfig struct { ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"` } `mapstructure:"prometheus"` + Meta struct { + Path string `mapstructure:"path"` + } `mapstructure:"metadata"` + Node struct { Wallet struct { Path string `mapstructure:"path"` diff --git a/cmd/neofs-node/config/meta/config.go b/cmd/neofs-node/config/meta/config.go new file mode 100644 index 0000000000..ae0a303b82 --- /dev/null +++ b/cmd/neofs-node/config/meta/config.go @@ -0,0 +1,17 @@ +package metaconfig + +import ( + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" +) + +const ( + subsection = "metadata" +) + +// Path returns the value of "path" config parameter +// from "metadata" section. +// +// Returns empty string if the value is missing or invalid. +func Path(c *config.Config) string { + return config.StringSafe(c.Sub(subsection), "path") +} diff --git a/cmd/neofs-node/config/meta/config_test.go b/cmd/neofs-node/config/meta/config_test.go new file mode 100644 index 0000000000..186b528de3 --- /dev/null +++ b/cmd/neofs-node/config/meta/config_test.go @@ -0,0 +1,29 @@ +package metaconfig_test + +import ( + "testing" + + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta" + configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test" + "github.com/stretchr/testify/require" +) + +func TestLoggerSection_Level(t *testing.T) { + t.Run("defaults", func(t *testing.T) { + emptyConfig := configtest.EmptyConfig() + require.Equal(t, "", metaconfig.Path(emptyConfig)) + }) + + const path = "../../../../config/example/node" + + var fileConfigTest = func(c *config.Config) { + require.Equal(t, "path/to/meta", metaconfig.Path(c)) + } + + configtest.ForEachFileType(path, fileConfigTest) + + t.Run("ENV", func(t *testing.T) { + configtest.ForEnvFileType(path, fileConfigTest) + }) +} diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 98062b05b4..1dd0ef3a64 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -136,6 +136,7 @@ func initApp(c *cfg) { initAndLog(c, "session", initSessionService) initAndLog(c, "reputation", initReputationService) initAndLog(c, "object", initObjectService) + initAndLog(c, "meta", initMeta) initAndLog(c, "tree", initTreeService) initAndLog(c, "morph notifications", listenMorphNotifications) diff --git a/cmd/neofs-node/meta.go b/cmd/neofs-node/meta.go new file mode 100644 index 0000000000..cd72aed490 --- /dev/null +++ b/cmd/neofs-node/meta.go @@ -0,0 +1,135 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "slices" + "sync" + + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" + cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" + "github.com/nspcc-dev/neofs-node/pkg/services/meta" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +func initMeta(c *cfg) { + if c.cfgMorph.client == nil { + initMorphComponents(c) + } + + c.cfgMeta.cLister = &containerListener{ + key: c.binPublicKey, + cnrClient: c.basics.cCli, + containers: c.cfgObject.cnrSource, + network: c.basics.netMapSource, + } + + m, err := meta.New(c.log.With(zap.String("service", "meta data")), + c.cfgMeta.cLister, + c.applicationConfiguration.fsChain.dialTimeout, + c.applicationConfiguration.fsChain.endpoints, + c.basics.containerSH, + c.basics.netmapSH, + c.applicationConfiguration.metadata.path) + fatalOnErr(err) + + c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { + err = m.Run(ctx) + if err != nil { + c.internalErr <- fmt.Errorf("meta data service error: %w", err) + } + })) +} + +type containerListener struct { + key []byte + + cnrClient *cntClient.Client + containers container.Source + network netmap.Source + + m sync.RWMutex + prevCnrs []cid.ID + prevNetMap *netmapsdk.NetMap + prevRes map[cid.ID]struct{} +} + +func (c *containerListener) List() (map[cid.ID]struct{}, error) { + actualContainers, err := c.cnrClient.List(nil) + if err != nil { + return nil, fmt.Errorf("read containers: %w", err) + } + curEpoch, err := c.network.Epoch() + if err != nil { + return nil, fmt.Errorf("read current NeoFS epoch: %w", err) + } + networkMap, err := c.network.GetNetMapByEpoch(curEpoch) + if err != nil { + return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err) + } + + c.m.RLock() + if c.prevNetMap != nil && c.prevCnrs != nil { + cnrsSame := slices.Equal(c.prevCnrs, actualContainers) + if !cnrsSame { + c.m.RUnlock() + return c.prevRes, nil + } + netmapSame := slices.EqualFunc(c.prevNetMap.Nodes(), networkMap.Nodes(), func(n1 netmapsdk.NodeInfo, n2 netmapsdk.NodeInfo) bool { + return bytes.Equal(n1.PublicKey(), n2.PublicKey()) + }) + if netmapSame { + c.m.RUnlock() + return c.prevRes, nil + } + } + c.m.RUnlock() + + var locM sync.Mutex + res := make(map[cid.ID]struct{}) + var wg errgroup.Group + for _, cID := range actualContainers { + wg.Go(func() error { + cnr, err := c.containers.Get(cID) + if err != nil { + return fmt.Errorf("read %s container: %w", cID, err) + } + + nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cID) + if err != nil { + return fmt.Errorf("apply container storage policy to %s container: %w", cID, err) + } + + for _, nodeSet := range nodeSets { + for _, node := range nodeSet { + if bytes.Equal(node.PublicKey(), c.key) { + locM.Lock() + res[cID] = struct{}{} + locM.Unlock() + return nil + } + } + } + + return nil + }) + } + + err = wg.Wait() + if err != nil { + return nil, err + } + + c.m.Lock() + c.prevCnrs = actualContainers + c.prevNetMap = networkMap + c.prevRes = res + c.m.Unlock() + + return res, nil +} diff --git a/config/example/node.env b/config/example/node.env index 9b550f5b0f..eae08c75da 100644 --- a/config/example/node.env +++ b/config/example/node.env @@ -22,6 +22,9 @@ NEOFS_NODE_RELAY=true NEOFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions NEOFS_NODE_PERSISTENT_STATE_PATH=/state +# Meta data section +NEOFS_METADATA_PATH=path/to/meta + # Tree service section NEOFS_TREE_ENABLED=true NEOFS_TREE_CACHE_SIZE=15 diff --git a/config/example/node.json b/config/example/node.json index 7df9b34626..b4731c1c6e 100644 --- a/config/example/node.json +++ b/config/example/node.json @@ -37,6 +37,9 @@ "path": "/state" } }, + "metadata": { + "path": "path/to/meta" + }, "grpc": [ { "endpoint": "s01.neofs.devenv:8080", diff --git a/config/example/node.yaml b/config/example/node.yaml index 299a616a14..3cc1714c04 100644 --- a/config/example/node.yaml +++ b/config/example/node.yaml @@ -105,6 +105,9 @@ object: put: pool_size_remote: 100 # number of async workers for remote PUT operations +metadata: + path: path/to/meta # path to meta data storages, required + storage: # note: shard configuration can be omitted for relay node (see `node.relay`) shard_pool_size: 15 # size of per-shard worker pools used for PUT operations diff --git a/docs/storage-node-configuration.md b/docs/storage-node-configuration.md index 9a0383536d..a754307381 100644 --- a/docs/storage-node-configuration.md +++ b/docs/storage-node-configuration.md @@ -25,6 +25,7 @@ There are some custom types used for brevity: | `replicator` | [Replicator service configuration](#replicator-section) | | `storage` | [Storage engine configuration](#storage-section) | | `grpc` | [gRPC configuration](#grpc-section) | +| `metadata` | [Meta service configuration](#meta-section) | | `node` | [Node configuration](#node-section) | | `object` | [Object service configuration](#object-section) | | `tree` | [Tree service configuration](#tree-section) | @@ -300,6 +301,17 @@ pilorama: +# `metadata` section + +```yaml +metadata: + path: path/to/meta +``` + +| Parameter | Type | Default value | Description | +|-------------------|------------|---------------|---------------------------------------| +| `path` | `string` | | Path to meta data storages, required. | + # `node` section ```yaml