Skip to content

feat: push metrics #5025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ require (
github.com/quic-go/quic-go v0.42.0 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/shirou/gopsutil v3.21.5+incompatible // indirect
github.com/shirou/gopsutil v3.21.5+incompatible
github.com/smartystreets/assertions v1.1.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.3.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func New(
chainBackend transaction.Backend,
cors []string,
stamperStore storage.Store,
registry *prometheus.Registry,
) *Service {
s := new(Service)

Expand All @@ -297,7 +298,7 @@ func New(
s.transaction = transaction
s.batchStore = batchStore
s.chainBackend = chainBackend
s.metricsRegistry = newDebugMetrics()
s.metricsRegistry = registry
s.preMapHooks = map[string]func(v string) (string, error){
"mimeMediaType": func(v string) (string, error) {
typ, _, err := mime.ParseMediaType(v)
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
p2pmock "github.com/ethersphere/bee/v2/pkg/p2p/mock"
"github.com/ethersphere/bee/v2/pkg/pingpong"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand Down Expand Up @@ -212,7 +213,7 @@ func newTestServer(t *testing.T, o testServerOptions) (*http.Client, *websocket.
o.BeeMode = api.FullMode
}

s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New())
s := api.New(o.PublicKey, o.PSSPublicKey, o.EthereumAddress, []string{o.WhitelistedAddr}, o.Logger, transaction, o.BatchStore, o.BeeMode, !o.ChequebookDisabled, !o.SwapDisabled, backend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, o.BeeMode.String()).MetricsRegistry())
testutil.CleanupCloser(t, s)

s.SetP2P(o.P2P)
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestParseName(t *testing.T) {
pk, _ := crypto.GenerateSecp256k1Key()
signer := crypto.NewDefaultSigner(pk)

s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New())
s := api.New(pk.PublicKey, pk.PublicKey, common.Address{}, nil, log, nil, nil, 1, false, false, nil, []string{"*"}, inmemstore.New(), registry.NewRegistry(false, api.FullMode.String()).MetricsRegistry())
s.Configure(signer, nil, api.Options{}, api.ExtraOptions{Resolver: tC.res}, 1, nil)
s.Mount()
s.EnableFullAPI()
Expand Down
32 changes: 0 additions & 32 deletions pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"strconv"
"time"

"github.com/ethersphere/bee/v2"
m "github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
)

const bytesInKB = 1000
Expand Down Expand Up @@ -134,33 +132,3 @@ func (rw *responseWriter) WriteHeader(code int) {
rw.UpgradedResponseWriter.WriteHeader(code)
rw.wroteHeader = true
}

func newDebugMetrics() (r *prometheus.Registry) {
r = prometheus.NewRegistry()

// register standard metrics
r.MustRegister(
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
Namespace: m.Namespace,
}),
collectors.NewGoCollector(),
prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: m.Namespace,
Name: "info",
Help: "Bee information.",
ConstLabels: prometheus.Labels{
"version": bee.Version,
},
}),
)

return r
}

func (s *Service) MetricsRegistry() *prometheus.Registry {
return s.metricsRegistry
}

func (s *Service) MustRegisterMetrics(cs ...prometheus.Collector) {
s.metricsRegistry.MustRegister(cs...)
}
130 changes: 130 additions & 0 deletions pkg/metrics/registry/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright 2025 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package registry

import (
"context"
"time"

"github.com/ethersphere/bee/v2"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
)

type Registry struct {
register *prometheus.Registry
pushRegister *prometheus.Registry
cpuGauge prometheus.Gauge
memGauge prometheus.Gauge
}

func NewRegistry(push bool, mode string) *Registry {
r := &Registry{
register: prometheus.NewRegistry(),
pushRegister: prometheus.NewRegistry(),
}

c := collectors.NewProcessCollector(collectors.ProcessCollectorOpts{
Namespace: metrics.Namespace,
})

g := collectors.NewGoCollector()

// Create CPU and memory gauge metrics
cpuGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Name: "system_cpu_usage_percent",
Help: "System CPU usage percentage",
})

memGauge := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Name: "system_memory_usage_percent",
Help: "System memory usage percentage",
})

v := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Name: "info",
Help: "Bee information.",
ConstLabels: prometheus.Labels{
"version": bee.Version,
"mode": mode,
},
})

r.MustRegister(c, g, v, cpuGauge, memGauge)

if push {
r.MustPushRegister(c, g, v, cpuGauge, memGauge)
}

// Store references to gauges for updating in PushWorker
r.cpuGauge = cpuGauge
r.memGauge = memGauge

return r
}

func (r *Registry) MetricsRegistry() *prometheus.Registry {
return r.register
}

func (r *Registry) PushWorker(ctx context.Context, path string, job string, logger log.Logger) func() error {
metricsPusher := push.New(path, job).Collector(r.pushRegister)

ctx, cancel := context.WithCancel(ctx)

go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Collect CPU metrics
percentages, err := cpu.Percent(0, false)
if err == nil && len(percentages) > 0 {
r.cpuGauge.Set(percentages[0])
} else if err != nil {
logger.Debug("failed to collect CPU metrics", "error", err)
}

// Collect memory metrics
vm, err := mem.VirtualMemory()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think when the metrics are gathered for pushing, the mem and cpu metrics are already collected, I dont think you need to add them manually like this

if err == nil {
r.memGauge.Set(vm.UsedPercent)
} else {
logger.Debug("failed to collect memory metrics", "error", err)
}

// Push metrics
if err := metricsPusher.Push(); err != nil {
logger.Debug("metrics push failed", "error", err)
}
}
}
}()

return func() error {
cancel()
return metricsPusher.Push()
}
}

func (r *Registry) MustRegister(cs ...prometheus.Collector) {
r.register.MustRegister(cs...)
}

func (r *Registry) MustPushRegister(cs ...prometheus.Collector) {
r.pushRegister.MustRegister(cs...)
}
3 changes: 2 additions & 1 deletion pkg/node/devnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/feeds/factory"
"github.com/ethersphere/bee/v2/pkg/gsoc"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
mockP2P "github.com/ethersphere/bee/v2/pkg/p2p/mock"
mockPingPong "github.com/ethersphere/bee/v2/pkg/pingpong/mock"
"github.com/ethersphere/bee/v2/pkg/postage"
Expand Down Expand Up @@ -360,7 +361,7 @@ func NewDevBee(logger log.Logger, o *DevOptions) (b *DevBee, err error) {
}),
)

apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New())
apiService := api.New(mockKey.PublicKey, mockKey.PublicKey, overlayEthAddress, nil, logger, mockTransaction, batchStore, api.DevMode, true, true, chainBackend, o.CORSAllowedOrigins, inmemstore.New(), registry.NewRegistry(false, api.DevMode.String()).MetricsRegistry())

apiService.Configure(signer, tracer, api.Options{
CORSAllowedOrigins: o.CORSAllowedOrigins,
Expand Down
81 changes: 50 additions & 31 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/hive"
"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/metrics"
"github.com/ethersphere/bee/v2/pkg/metrics/registry"
"github.com/ethersphere/bee/v2/pkg/p2p"
"github.com/ethersphere/bee/v2/pkg/p2p/libp2p"
"github.com/ethersphere/bee/v2/pkg/pingpong"
Expand Down Expand Up @@ -77,7 +78,6 @@ import (
"github.com/ethersphere/bee/v2/pkg/util/syncutil"
"github.com/hashicorp/go-multierror"
ma "github.com/multiformats/go-multiaddr"
promc "github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/sha3"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -429,8 +429,12 @@ func NewBee(
b.stamperStoreCloser = stamperStore

var apiService *api.Service
var metricsRegistery *registry.Registry

if o.APIAddr != "" {

metricsRegistery = registry.NewRegistry(false, beeNodeMode.String())

if o.MutexProfile {
_ = runtime.SetMutexProfileFraction(1)
}
Expand All @@ -457,6 +461,7 @@ func NewBee(
chainBackend,
o.CORSAllowedOrigins,
stamperStore,
metricsRegistery.MetricsRegistry(),
)

apiService.Mount()
Expand Down Expand Up @@ -606,12 +611,6 @@ func NewBee(
}
}

var registry *promc.Registry

if apiService != nil {
registry = apiService.MetricsRegistry()
}

p2ps, err := libp2p.New(ctx, signer, networkID, swarmAddress, addr, addressbook, stateStore, lightNodes, logger, tracer, libp2p.Options{
PrivateKey: libp2pPrivateKey,
NATAddr: o.NATAddr,
Expand All @@ -620,7 +619,7 @@ func NewBee(
FullNode: o.FullNodeMode,
Nonce: nonce,
ValidateOverlay: chainEnabled,
Registry: registry,
Registry: metricsRegistery.MetricsRegistry(),
})
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
Expand Down Expand Up @@ -1142,48 +1141,68 @@ func NewBee(

if o.APIAddr != "" {
// register metrics from components
apiService.MustRegisterMetrics(p2ps.Metrics()...)
apiService.MustRegisterMetrics(pingPong.Metrics()...)
apiService.MustRegisterMetrics(acc.Metrics()...)
apiService.MustRegisterMetrics(localStore.Metrics()...)
apiService.MustRegisterMetrics(kad.Metrics()...)
apiService.MustRegisterMetrics(saludService.Metrics()...)
apiService.MustRegisterMetrics(stateStoreMetrics.Metrics()...)
metricsRegistery.MustRegister(p2ps.Metrics()...)
metricsRegistery.MustRegister(pingPong.Metrics()...)
metricsRegistery.MustRegister(acc.Metrics()...)
metricsRegistery.MustRegister(localStore.Metrics()...)
metricsRegistery.MustRegister(kad.Metrics()...)
metricsRegistery.MustRegister(saludService.Metrics()...)
metricsRegistery.MustRegister(stateStoreMetrics.Metrics()...)

if pullerService != nil {
apiService.MustRegisterMetrics(pullerService.Metrics()...)
metricsRegistery.MustRegister(pullerService.Metrics()...)
}

if agent != nil {
apiService.MustRegisterMetrics(agent.Metrics()...)
metricsRegistery.MustRegister(agent.Metrics()...)
}

apiService.MustRegisterMetrics(pushSyncProtocol.Metrics()...)
apiService.MustRegisterMetrics(pusherService.Metrics()...)
apiService.MustRegisterMetrics(pullSyncProtocol.Metrics()...)
apiService.MustRegisterMetrics(retrieval.Metrics()...)
apiService.MustRegisterMetrics(lightNodes.Metrics()...)
apiService.MustRegisterMetrics(hive.Metrics()...)
metricsRegistery.MustRegister(pushSyncProtocol.Metrics()...)
metricsRegistery.MustRegister(pusherService.Metrics()...)
metricsRegistery.MustRegister(pullSyncProtocol.Metrics()...)
metricsRegistery.MustRegister(retrieval.Metrics()...)
metricsRegistery.MustRegister(lightNodes.Metrics()...)
metricsRegistery.MustRegister(hive.Metrics()...)

// TODO: remove this when we have a push metrics opt in configuration
if true {

// TODO: remove this when we have an include option in the configuration
include := []string{"pusher", "pushsync", "pullsync", "localstore"}

for _, m := range include {
switch m {
case "pusher":
metricsRegistery.MustPushRegister(pusherService.Metrics()...)
case "pushsync":
metricsRegistery.MustPushRegister(pushSyncProtocol.Metrics()...)
case "pullsync":
metricsRegistery.MustPushRegister(pullSyncProtocol.Metrics()...)
case "localstore":
metricsRegistery.MustPushRegister(localStore.Metrics()...)
}
}
}

if bs, ok := batchStore.(metrics.Collector); ok {
apiService.MustRegisterMetrics(bs.Metrics()...)
metricsRegistery.MustRegister(bs.Metrics()...)
}
if ls, ok := eventListener.(metrics.Collector); ok {
apiService.MustRegisterMetrics(ls.Metrics()...)
metricsRegistery.MustRegister(ls.Metrics()...)
}
if pssServiceMetrics, ok := pssService.(metrics.Collector); ok {
apiService.MustRegisterMetrics(pssServiceMetrics.Metrics()...)
metricsRegistery.MustRegister(pssServiceMetrics.Metrics()...)
}
if swapBackendMetrics, ok := chainBackend.(metrics.Collector); ok {
apiService.MustRegisterMetrics(swapBackendMetrics.Metrics()...)
metricsRegistery.MustRegister(swapBackendMetrics.Metrics()...)
}

if l, ok := logger.(metrics.Collector); ok {
apiService.MustRegisterMetrics(l.Metrics()...)
metricsRegistery.MustRegister(l.Metrics()...)
}
apiService.MustRegisterMetrics(pseudosettleService.Metrics()...)

metricsRegistery.MustRegister(pseudosettleService.Metrics()...)
if swapService != nil {
apiService.MustRegisterMetrics(swapService.Metrics()...)
metricsRegistery.MustRegister(swapService.Metrics()...)
}

apiService.Configure(signer, tracer, api.Options{
Expand Down
Loading