Skip to content

Commit

Permalink
FS-1259; Basic pushgateway metrics collection (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakeschuurmans authored Mar 13, 2024
1 parent c58b43d commit 72432c9
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 6 deletions.
23 changes: 17 additions & 6 deletions cmd/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"github.com/equinix-labs/otel-init-go/otelinit"
"github.com/metal-toolbox/fleet-scheduler/internal/app"
"github.com/metal-toolbox/fleet-scheduler/internal/client"
"github.com/metal-toolbox/fleet-scheduler/internal/metrics"
"github.com/metal-toolbox/fleet-scheduler/internal/version"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/net/context"
Expand Down Expand Up @@ -34,12 +36,6 @@ func init() {
}

func inventory(ctx context.Context) error {
otelCtx, otelShutdown := otelinit.InitOpenTelemetry(ctx, "fleet-scheduler")
defer otelShutdown(ctx)

otelCtxWithCancel, cancelFunc := context.WithCancel(otelCtx)
defer cancelFunc()

cfg, err := app.LoadConfig(cfgFile)
if err != nil {
return err
Expand All @@ -51,6 +47,19 @@ func inventory(ctx context.Context) error {
return err
}

metricsPusher := metrics.NewPusher(logger, "inventory")
metricsPusher.AddCollector(collectors.NewGoCollector())
err = metricsPusher.Start()
if err != nil {
return err
}

otelCtx, otelShutdown := otelinit.InitOpenTelemetry(ctx, "fleet-scheduler")
defer otelShutdown(ctx)

otelCtxWithCancel, cancelFunc := context.WithCancel(otelCtx)
defer cancelFunc()

v := version.Current()
logger.WithFields(logrus.Fields{
"GitCommit": v.GitCommit,
Expand All @@ -71,5 +80,7 @@ func inventory(ctx context.Context) error {

logger.Info("Task: 'CreateConditionInventoryForAllServers' complete")

metricsPusher.KillAndWait()

return nil
}
102 changes: 102 additions & 0 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package metrics

import (
"errors"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/sirupsen/logrus"
)

var ErrorMetricsAlreadyRunning = errors.New("metrics server is already running")

// Pusher is a small wrapper around prometheus's Pusher to allow you to easily set up metrics to be pushed to a prometheus-pushgateway. It is thread safe
type Pusher struct {
pusher *push.Pusher
mu sync.Mutex
wg sync.WaitGroup
live bool
log *logrus.Logger
}

func NewPusher(logger *logrus.Logger, task string) *Pusher {
newPusher := &Pusher{
mu: sync.Mutex{},
wg: sync.WaitGroup{},
live: false,
log: logger,
pusher: push.New("http://fleet-pushgateway:9090/", task),
}

return newPusher
}

// Add a new collector to the pusher
func (mp *Pusher) AddCollector(collector prometheus.Collector) {
mp.mu.Lock()
defer mp.mu.Unlock()

mp.pusher = mp.pusher.Collector(collector)
}

// Start the metrics pusher in a background thread
func (mp *Pusher) Start() error {
if mp.IsAlive() {
return ErrorMetricsAlreadyRunning
}

// First test that the pushgateway is accessible by running an initial push
err := mp.Push()
if err != nil {
return err
}

mp.wg.Add(1)

go func() {
defer mp.wg.Done()

mp.mu.Lock()
mp.live = true
mp.mu.Unlock()

for mp.IsAlive() {
time.Sleep(time.Second)

err := mp.Push()
if err != nil {
mp.log.Errorf("Failed to push metrics to gateway: %s", err.Error())
return
}
}
}()

return nil
}

// Used to check if the metrics pusher is still running
func (mp *Pusher) IsAlive() bool {
mp.mu.Lock()
defer mp.mu.Unlock()

return mp.live
}

// Kills the metrics pusher thread and waits for it to die
func (mp *Pusher) KillAndWait() {
mp.mu.Lock()
mp.live = false
mp.mu.Unlock()

mp.wg.Wait()
}

// Pushes metrics to the pushgateway
func (mp *Pusher) Push() error {
mp.mu.Lock()
defer mp.mu.Unlock()

return mp.pusher.Push()
}

0 comments on commit 72432c9

Please sign in to comment.