Skip to content

Commit

Permalink
feat(pruner): adding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Jan 8, 2024
1 parent 2ad9624 commit dcdb874
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 3 deletions.
2 changes: 2 additions & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/state"
)

Expand Down Expand Up @@ -93,6 +94,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]),
fx.Invoke(node.WithMetrics),
fx.Invoke(share.WithDiscoveryMetrics),
fx.Invoke(pruner.WithPrunerMetrics),
)

samplingMetrics := fx.Options(
Expand Down
79 changes: 79 additions & 0 deletions pruner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package pruner

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
meter = otel.Meter("storage_pruner")
)

type metrics struct {
prunedCounter metric.Int64Counter

lastPruned metric.Int64ObservableGauge
failedPrunes metric.Int64ObservableGauge
}

func (s *Service) WithMetrics() error {
prunedCounter, err := meter.Int64Counter("pruner_pruned_counter",
metric.WithDescription("pruner pruned header counter"))
if err != nil {
return err
}

failedPrunes, err := meter.Int64ObservableGauge("pruner_failed_counter",
metric.WithDescription("pruner failed prunes counter"))
if err != nil {
return err
}

lastPruned, err := meter.Int64ObservableGauge("pruner_last_pruned",
metric.WithDescription("pruner highest pruned height"))
if err != nil {
return err
}

callback := func(ctx context.Context, observer metric.Observer) error {
observer.ObserveInt64(failedPrunes, int64(len(s.failedHeaders)))
return nil
}

if _, err := meter.RegisterCallback(callback, failedPrunes); err != nil {
return err
}

callback = func(ctx context.Context, observer metric.Observer) error {
lastPrunedHeader := s.checkpoint.lastPrunedHeader.Load()
if lastPrunedHeader != nil {
observer.ObserveInt64(lastPruned, int64(lastPrunedHeader.Height()))
}
return nil
}

if _, err := meter.RegisterCallback(callback, lastPruned); err != nil {
return err
}

s.metrics = &metrics{
prunedCounter: prunedCounter,
lastPruned: lastPruned,
failedPrunes: failedPrunes,
}
return nil
}

func (m *metrics) observePrune(ctx context.Context, failed bool) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}
m.prunedCounter.Add(ctx, 1, metric.WithAttributes(
attribute.Bool("failed", failed)))
}
6 changes: 6 additions & 0 deletions pruner/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ func WithDisabledGC() Option {
p.gcCycle = time.Duration(0)
}
}

// WithPrunerMetrics is a utility function to turn on pruner metrics and that is
// expected to be "invoked" by the fx lifecycle.
func WithPrunerMetrics(s *Service) error {
return s.WithMetrics()
}
12 changes: 9 additions & 3 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ import (
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"

hdr "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
)

var log = logging.Logger("pruner/service")

// Service handles the pruning routine for the node using the
// prune Pruner.
type Service struct {
Expand All @@ -21,14 +24,16 @@ type Service struct {
getter hdr.Getter[*header.ExtendedHeader] // TODO @renaynay: expects a header service with access to sync head

checkpoint *checkpoint
failedHeaders map[uint64]error
maxPruneablePerGC uint64
numBlocksInWindow uint64

ctx context.Context
cancel context.CancelFunc
doneCh chan struct{}

params Params
params Params
metrics *metrics
}

func NewService(
Expand Down Expand Up @@ -110,9 +115,10 @@ func (s *Service) prune() {
err = s.pruner.Prune(pruneCtx, eh)
if err != nil {
// TODO: @distractedm1nd: updatecheckpoint should be called on the last NON-ERRORED header
// TODO @renaynay: record + report errors properly
continue
log.Errorf("failed to prune header %d: %s", eh.Height(), err)
s.failedHeaders[eh.Height()] = err
}
s.metrics.observePrune(pruneCtx, err != nil)
}
cancel()

Expand Down

0 comments on commit dcdb874

Please sign in to comment.