Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push metrics #5025

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func New(
chainBackend transaction.Backend,
cors []string,
stamperStore storage.Store,
registry *prometheus.Registry,
) *Service {
s := new(Service)

Expand All @@ -297,7 +298,7 @@ func New(
s.transaction = transaction
s.batchStore = batchStore
s.chainBackend = chainBackend
s.metricsRegistry = newDebugMetrics()
s.metricsRegistry = registry
s.preMapHooks = map[string]func(v string) (string, error){
"mimeMediaType": func(v string) (string, error) {
typ, _, err := mime.ParseMediaType(v)
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
"github.com/ethersphere/bee/v2/pkg/pingpong"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand Down Expand Up @@ -212,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
o.BeeMode = api.FullMode
}

s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New())
s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry())
testutil.CleanupCloser(t, s)

s.SetP2P(o.P2P)
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestParseName(t *testing.T) {
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)

s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New())
s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry())
s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil)
s.Mount()
s.EnableFullAPI()
Expand Down
32 changes: 0 additions & 32 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"strconv"
"time"

"github.com/ethersphere/bee/v2"
m "github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)

const bytesInKB = 1000
Expand Down Expand Up @@ -134,33 +132,3 @@ func (rw *responseWriter) WriteHeader(code int) {
rw.UpgradedResponseWriter.WriteHeader(code)
rw.wroteHeader = true
}

func newDebugMetrics() (r *prometheus.Registry) {
r = prometheus.NewRegistry()

// register standard metrics
r.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
Namespace: m.Namespace,
}),
collectors.NewGoCollector(),
prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Name: "info",
Help: "Bee information.",
ConstLabels: prometheus.Labels{
"version": bee.Version,
},
}),
)

return r
}

func (s *Service) MetricsRegistry() *prometheus.Registry {
return s.metricsRegistry
}

func (s *Service) MustRegisterMetrics(cs ...prometheus.Collector) {
s.metricsRegistry.MustRegister(cs...)
}
86 changes: 86 additions & 0 deletions pkg/metrics/registry/registery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package registry

import (
"context"
"time"

"github.com/ethersphere/bee/v2"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/push"
)

type Registry struct {
register *prometheus.Registry
pushRegister *prometheus.Registry
}

func NewRegistry(push bool) *Registry {
r := &Registry{
register: prometheus.NewRegistry(),
pushRegister: prometheus.NewRegistry(),
}

c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
Namespace: metrics.Namespace,
})

g := collectors.NewGoCollector()

v := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Name: "info",
Help: "Bee information.",
ConstLabels: prometheus.Labels{
"version": bee.Version,
},
})

r.MustRegister(c, g, v)

if push {
r.MustPushRegister(c, g, v)
}

return r
}

func (r *Registry) MetricsRegistry() *prometheus.Registry {
return r.register
}

func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error {
metricsPusher := push.New(path, job).Collector(r.pushRegister)

ctx, cancel := context.WithCancel(ctx)

go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Minute):
if err := metricsPusher.Push(); err != nil {
logger.Debug("metrics push failed", "error", err)
}
}
}()

return func() error {
cancel()
return metricsPusher.Push()
}
}

func (r *Registry) MustRegister(cs ...prometheus.Collector) {
r.register.MustRegister(cs...)
}

func (r *Registry) MustPushRegister(cs ...prometheus.Collector) {
r.pushRegister.MustRegister(cs...)
}
3 changes: 2 additions & 1 deletion pkg/node/devnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/feeds/factory"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock"
mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand Down Expand Up @@ -360,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) {
}),
)

apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New())
apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false).MetricsRegistry())

apiService.Configure(signer, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
Expand Down
81 changes: 50 additions & 31 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/hive"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/p2p/libp2p"
"github.com/ethersphere/bee/v2/pkg/pingpong"
Expand Down Expand Up @@ -77,7 +78,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
promc "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/sha3"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -429,8 +429,12 @@ func NewBee(
b.stamperStoreCloser = stamperStore

var apiService *api.Service
var metricsRegistery *registry.Registry

if o.APIAddr != "" {

metricsRegistery = registry.NewRegistry(false)

if o.MutexProfile {
_ = runtime.SetMutexProfileFraction(1)
}
Expand All @@ -457,6 +461,7 @@ func NewBee(
chainBackend,
o.CORSAllowedOrigins,
stamperStore,
metricsRegistery.MetricsRegistry(),
)

apiService.Mount()
Expand Down Expand Up @@ -606,12 +611,6 @@ func NewBee(
}
}

var registry *promc.Registry

if apiService != nil {
registry = apiService.MetricsRegistry()
}

p2ps, err := libp2p.New(ctx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr,
Expand All @@ -620,7 +619,7 @@ func NewBee(
FullNode: o.FullNodeMode,
Nonce: nonce,
ValidateOverlay: chainEnabled,
Registry: registry,
Registry: metricsRegistery.MetricsRegistry(),
})
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
Expand Down Expand Up @@ -1142,48 +1141,68 @@ func NewBee(

if o.APIAddr != "" {
// register metrics from components
apiService.MustRegisterMetrics(p2ps.Metrics()...)
apiService.MustRegisterMetrics(pingPong.Metrics()...)
apiService.MustRegisterMetrics(acc.Metrics()...)
apiService.MustRegisterMetrics(localStore.Metrics()...)
apiService.MustRegisterMetrics(kad.Metrics()...)
apiService.MustRegisterMetrics(saludService.Metrics()...)
apiService.MustRegisterMetrics(stateStoreMetrics.Metrics()...)
metricsRegistery.MustRegister(p2ps.Metrics()...)
metricsRegistery.MustRegister(pingPong.Metrics()...)
metricsRegistery.MustRegister(acc.Metrics()...)
metricsRegistery.MustRegister(localStore.Metrics()...)
metricsRegistery.MustRegister(kad.Metrics()...)
metricsRegistery.MustRegister(saludService.Metrics()...)
metricsRegistery.MustRegister(stateStoreMetrics.Metrics()...)

if pullerService != nil {
apiService.MustRegisterMetrics(pullerService.Metrics()...)
metricsRegistery.MustRegister(pullerService.Metrics()...)
}

if agent != nil {
apiService.MustRegisterMetrics(agent.Metrics()...)
metricsRegistery.MustRegister(agent.Metrics()...)
}

apiService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
apiService.MustRegisterMetrics(pusherService.Metrics()...)
apiService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
apiService.MustRegisterMetrics(retrieval.Metrics()...)
apiService.MustRegisterMetrics(lightNodes.Metrics()...)
apiService.MustRegisterMetrics(hive.Metrics()...)
metricsRegistery.MustRegister(pushSyncProtocol.Metrics()...)
metricsRegistery.MustRegister(pusherService.Metrics()...)
metricsRegistery.MustRegister(pullSyncProtocol.Metrics()...)
metricsRegistery.MustRegister(retrieval.Metrics()...)
metricsRegistery.MustRegister(lightNodes.Metrics()...)
metricsRegistery.MustRegister(hive.Metrics()...)

// TODO: remove this when we have a push metrics opt in configuration
if true {

// TODO: remove this when we have an include option in the configuration
include := []string{"pusher", "pushsync", "pullsync", "localstore"}

for _, m := range include {
switch m {
case "pusher":
metricsRegistery.MustPushRegister(pusherService.Metrics()...)
case "pushsync":
metricsRegistery.MustPushRegister(pushSyncProtocol.Metrics()...)
case "pullsync":
metricsRegistery.MustPushRegister(pullSyncProtocol.Metrics()...)
case "localstore":
metricsRegistery.MustPushRegister(localStore.Metrics()...)
}
}
}

if bs, ok := batchStore.(metrics.Collector); ok {
apiService.MustRegisterMetrics(bs.Metrics()...)
metricsRegistery.MustRegister(bs.Metrics()...)
}
if ls, ok := eventListener.(metrics.Collector); ok {
apiService.MustRegisterMetrics(ls.Metrics()...)
metricsRegistery.MustRegister(ls.Metrics()...)
}
if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
apiService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
metricsRegistery.MustRegister(pssServiceMetrics.Metrics()...)
}
if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok {
apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...)
metricsRegistery.MustRegister(swapBackendMetrics.Metrics()...)
}

if l, ok := logger.(metrics.Collector); ok {
apiService.MustRegisterMetrics(l.Metrics()...)
metricsRegistery.MustRegister(l.Metrics()...)
}
apiService.MustRegisterMetrics(pseudosettleService.Metrics()...)

metricsRegistery.MustRegister(pseudosettleService.Metrics()...)
if swapService != nil {
apiService.MustRegisterMetrics(swapService.Metrics()...)
metricsRegistery.MustRegister(swapService.Metrics()...)
}

apiService.Configure(signer, tracer, api.Options{
Expand Down
Loading