Skip to content

Commit

Permalink
node/meta: catch meta notifications from chain
Browse files Browse the repository at this point in the history
Store caught information in MPT structures. Closes #3070.

Signed-off-by: Pavel Karpy <[email protected]>
  • Loading branch information
carpawell committed Feb 14, 2025
1 parent 2b6a1e5 commit b1073bb
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 0 deletions.
15 changes: 15 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
fstreeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/engine/shard/blobstor/fstree"
fschainconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/fschain"
loggerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/logger"
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
nodeconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/node"
objectconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/object"
policerconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/policer"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/network/cache"
"github.com/nspcc-dev/neofs-node/pkg/services/control"
controlSvc "github.com/nspcc-dev/neofs-node/pkg/services/control/server"
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
Expand Down Expand Up @@ -94,6 +96,10 @@ type applicationConfiguration struct {
timestamp bool
}

metadata struct {
path string
}

engine struct {
errorThreshold uint32
shardPoolSize uint32
Expand Down Expand Up @@ -159,6 +165,10 @@ func (a *applicationConfiguration) readConfig(c *config.Config) error {
a.policer.replicationCooldown = policerconfig.ReplicationCooldown(c)
a.policer.objectBatchSize = policerconfig.ObjectBatchSize(c)

// Meta data

a.metadata.path = metaconfig.Path(c)

Check warning on line 171 in cmd/neofs-node/config.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/config.go#L168-L171

Added lines #L168 - L171 were not covered by tests
// Storage Engine

a.engine.errorThreshold = engineconfig.ShardErrorThreshold(c)
Expand Down Expand Up @@ -396,6 +406,7 @@ type cfg struct {
// configuration of the internal
// services
cfgGRPC cfgGRPC
cfgMeta cfgMeta
cfgMorph cfgMorph
cfgContainer cfgContainer
cfgNodeInfo cfgNodeInfo
Expand Down Expand Up @@ -427,6 +438,10 @@ type cfgGRPC struct {
servers []*grpc.Server
}

type cfgMeta struct {
cLister meta.ContainerLister
}

type cfgMorph struct {
client *client.Client

Expand Down
4 changes: 4 additions & 0 deletions cmd/neofs-node/config/internal/validate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ type valideConfig struct {
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
} `mapstructure:"prometheus"`

Meta struct {
Path string `mapstructure:"path"`
} `mapstructure:"metadata"`

Node struct {
Wallet struct {
Path string `mapstructure:"path"`
Expand Down
17 changes: 17 additions & 0 deletions cmd/neofs-node/config/meta/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package metaconfig

import (
"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
)

const (
subsection = "metadata"
)

// Path returns the value of "path" config parameter
// from "metadata" section.
//
// Returns empty string if the value is missing or invalid.
func Path(c *config.Config) string {
return config.StringSafe(c.Sub(subsection), "path")
}
29 changes: 29 additions & 0 deletions cmd/neofs-node/config/meta/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metaconfig_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/cmd/neofs-node/config"
metaconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/meta"
configtest "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/test"
"github.com/stretchr/testify/require"
)

func TestLoggerSection_Level(t *testing.T) {
t.Run("defaults", func(t *testing.T) {
emptyConfig := configtest.EmptyConfig()
require.Equal(t, "", metaconfig.Path(emptyConfig))
})

const path = "../../../../config/example/node"

var fileConfigTest = func(c *config.Config) {
require.Equal(t, "path/to/meta", metaconfig.Path(c))
}

configtest.ForEachFileType(path, fileConfigTest)

t.Run("ENV", func(t *testing.T) {
configtest.ForEnvFileType(path, fileConfigTest)
})
}
1 change: 1 addition & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func initApp(c *cfg) {
initAndLog(c, "session", initSessionService)
initAndLog(c, "reputation", initReputationService)
initAndLog(c, "object", initObjectService)
initAndLog(c, "meta", initMeta)

Check warning on line 139 in cmd/neofs-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/main.go#L139

Added line #L139 was not covered by tests
initAndLog(c, "tree", initTreeService)

initAndLog(c, "morph notifications", listenMorphNotifications)
Expand Down
135 changes: 135 additions & 0 deletions cmd/neofs-node/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package main

import (
"bytes"
"context"
"fmt"
"slices"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/services/meta"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

func initMeta(c *cfg) {
if c.cfgMorph.client == nil {
initMorphComponents(c)
}

Check warning on line 23 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L20-L23

Added lines #L20 - L23 were not covered by tests

c.cfgMeta.cLister = &containerListener{
key: c.binPublicKey,
cnrClient: c.basics.cCli,
containers: c.cfgObject.cnrSource,
network: c.basics.netMapSource,
}

m, err := meta.New(c.log.With(zap.String("service", "meta data")),
c.cfgMeta.cLister,
c.applicationConfiguration.fsChain.dialTimeout,
c.applicationConfiguration.fsChain.endpoints,
c.basics.containerSH,
c.basics.netmapSH,
c.applicationConfiguration.metadata.path)
fatalOnErr(err)

c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) {
err = m.Run(ctx)
if err != nil {
c.internalErr <- fmt.Errorf("meta data service error: %w", err)
}

Check warning on line 45 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L25-L45

Added lines #L25 - L45 were not covered by tests
}))
}

type containerListener struct {
key []byte

cnrClient *cntClient.Client
containers container.Source
network netmap.Source

m sync.RWMutex
prevCnrs []cid.ID
prevNetMap *netmapsdk.NetMap
prevRes map[cid.ID]struct{}
}

func (c *containerListener) List() (map[cid.ID]struct{}, error) {
actualContainers, err := c.cnrClient.List(nil)
if err != nil {
return nil, fmt.Errorf("read containers: %w", err)
}
curEpoch, err := c.network.Epoch()
if err != nil {
return nil, fmt.Errorf("read current NeoFS epoch: %w", err)
}
networkMap, err := c.network.GetNetMapByEpoch(curEpoch)
if err != nil {
return nil, fmt.Errorf("read network map at the current epoch #%d: %w", curEpoch, err)
}

Check warning on line 74 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L62-L74

Added lines #L62 - L74 were not covered by tests

c.m.RLock()
if c.prevNetMap != nil && c.prevCnrs != nil {
cnrsSame := slices.Equal(c.prevCnrs, actualContainers)
if !cnrsSame {
c.m.RUnlock()
return c.prevRes, nil
}
netmapSame := slices.EqualFunc(c.prevNetMap.Nodes(), networkMap.Nodes(), func(n1 netmapsdk.NodeInfo, n2 netmapsdk.NodeInfo) bool {
return bytes.Equal(n1.PublicKey(), n2.PublicKey())
})
if netmapSame {
c.m.RUnlock()
return c.prevRes, nil
}

Check warning on line 89 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L76-L89

Added lines #L76 - L89 were not covered by tests
}
c.m.RUnlock()

var locM sync.Mutex
res := make(map[cid.ID]struct{})
var wg errgroup.Group
for _, cID := range actualContainers {
wg.Go(func() error {
cnr, err := c.containers.Get(cID)
if err != nil {
return fmt.Errorf("read %s container: %w", cID, err)
}

Check warning on line 101 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L91-L101

Added lines #L91 - L101 were not covered by tests

nodeSets, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cID)
if err != nil {
return fmt.Errorf("apply container storage policy to %s container: %w", cID, err)
}

Check warning on line 106 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L103-L106

Added lines #L103 - L106 were not covered by tests

for _, nodeSet := range nodeSets {
for _, node := range nodeSet {
if bytes.Equal(node.PublicKey(), c.key) {
locM.Lock()
res[cID] = struct{}{}
locM.Unlock()
return nil
}

Check warning on line 115 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L108-L115

Added lines #L108 - L115 were not covered by tests
}
}

return nil

Check warning on line 119 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L119

Added line #L119 was not covered by tests
})
}

err = wg.Wait()
if err != nil {
return nil, err
}

Check warning on line 126 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L123-L126

Added lines #L123 - L126 were not covered by tests

c.m.Lock()
c.prevCnrs = actualContainers
c.prevNetMap = networkMap
c.prevRes = res
c.m.Unlock()

return res, nil

Check warning on line 134 in cmd/neofs-node/meta.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/meta.go#L128-L134

Added lines #L128 - L134 were not covered by tests
}
3 changes: 3 additions & 0 deletions config/example/node.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ NEOFS_NODE_RELAY=true
NEOFS_NODE_PERSISTENT_SESSIONS_PATH=/sessions
NEOFS_NODE_PERSISTENT_STATE_PATH=/state

# Meta data section
NEOFS_METADATA_PATH=path/to/meta

# Tree service section
NEOFS_TREE_ENABLED=true
NEOFS_TREE_CACHE_SIZE=15
Expand Down
3 changes: 3 additions & 0 deletions config/example/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
"path": "/state"
}
},
"metadata": {
"path": "path/to/meta"
},
"grpc": [
{
"endpoint": "s01.neofs.devenv:8080",
Expand Down
3 changes: 3 additions & 0 deletions config/example/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ object:
put:
pool_size_remote: 100 # number of async workers for remote PUT operations

metadata:
path: path/to/meta # path to meta data storages, required

storage:
# note: shard configuration can be omitted for relay node (see `node.relay`)
shard_pool_size: 15 # size of per-shard worker pools used for PUT operations
Expand Down
12 changes: 12 additions & 0 deletions docs/storage-node-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ There are some custom types used for brevity:
| `replicator` | [Replicator service configuration](#replicator-section) |
| `storage` | [Storage engine configuration](#storage-section) |
| `grpc` | [gRPC configuration](#grpc-section) |
| `metadata` | [Meta service configuration](#meta-section) |
| `node` | [Node configuration](#node-section) |
| `object` | [Object service configuration](#object-section) |
| `tree` | [Tree service configuration](#tree-section) |
Expand Down Expand Up @@ -300,6 +301,17 @@ pilorama:



# `metadata` section

```yaml
metadata:
path: path/to/meta
```

| Parameter | Type | Default value | Description |
|-------------------|------------|---------------|---------------------------------------|
| `path` | `string` | | Path to meta data storages, required. |

# `node` section

```yaml
Expand Down

0 comments on commit b1073bb

Please sign in to comment.