Skip to content

Commit

Permalink
Refactor gauge strategy (#284)
Browse files Browse the repository at this point in the history
* Refactor gauge strategy to use function
  • Loading branch information
Garbett1 authored Mar 4, 2024
1 parent 7b23f14 commit ceed86e
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ var totalRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "requests_total",
})
var currentRequests = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "mettle",
Name: "requests_current",
})

var totalFailedActions = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Expand Down Expand Up @@ -105,7 +101,6 @@ var deleteJobsDurations = prometheus.NewHistogram(prometheus.HistogramOpts{

var metrics = []prometheus.Collector{
totalRequests,
currentRequests,
totalFailedActions,
totalSuccessfulActions,
timeToComplete,
Expand All @@ -117,6 +112,8 @@ var metrics = []prometheus.Collector{
deleteJobsDurations,
}

var register sync.Once

func init() {
for _, metric := range metrics {
prometheus.MustRegister(metric)
Expand Down Expand Up @@ -168,6 +165,21 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
numPollers: queueOpts.NumPollers,
deleteJobsTicker: time.NewTicker(10 * time.Minute),
}

// _Technically_ this won't happen more than once in normal running, as we'd only run 1 server, but it does happen in tests.
register.Do(func() {
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "mettle",
Name: "requests_current",
},
func() float64 {
srv.mutex.Lock()
defer srv.mutex.Unlock()
return float64(len(srv.jobs))
},
))
})

log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
log.Notice(" %s: %s", k, strings.Join(v, ", "))
Expand All @@ -177,7 +189,6 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
log.Warningf("Failed to get inflight executions: %s", err)
} else if len(jobs) > 0 {
srv.jobs = jobs
currentRequests.Set(float64(len(srv.jobs)))
log.Notice("Updated server with %d inflight executions", len(srv.jobs))
}
go srv.Receive()
Expand Down Expand Up @@ -404,7 +415,6 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
s.jobs[digest.Hash] = j
log.Debug("Created job for %s", digest.Hash)
totalRequests.Inc()
currentRequests.Inc()
created = true
} else if create && time.Since(j.LastUpdate) >= resumptionTime {
// In this path we think the job is too old to be relevant; we don't actually create
Expand Down Expand Up @@ -522,7 +532,6 @@ func (s *server) process(msg *pubsub.Message) {
LastUpdate: time.Now(),
}
s.jobs[key] = j
currentRequests.Inc()
}
if metadata.Stage != pb.ExecutionStage_QUEUED || !j.SentFirst {
// Only send QUEUED messages if they're the first one. This prevents us from
Expand Down Expand Up @@ -570,7 +579,6 @@ func (s *server) periodicallyDeleteJobs() {
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
delete(s.jobs, digest)
currentRequests.Dec()
}
}
s.mutex.Unlock()
Expand Down

0 comments on commit ceed86e

Please sign in to comment.