diff --git a/mettle/api/api.go b/mettle/api/api.go index 0029bf5c..7edb0895 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -49,10 +49,6 @@ var totalRequests = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "mettle", Name: "requests_total", }) -var currentRequests = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: "mettle", - Name: "requests_current", -}) var totalFailedActions = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "mettle", @@ -105,7 +101,6 @@ var deleteJobsDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ var metrics = []prometheus.Collector{ totalRequests, - currentRequests, totalFailedActions, totalSuccessfulActions, timeToComplete, @@ -117,6 +112,8 @@ var metrics = []prometheus.Collector{ deleteJobsDurations, } +var register sync.Once + func init() { for _, metric := range metrics { prometheus.MustRegister(metric) @@ -171,6 +168,21 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, numPollers: queueOpts.NumPollers, deleteJobsTicker: time.NewTicker(10 * time.Minute), } + + // _Technically_ this won't happen more than once in normal running, as we'd only run 1 server, but it does happen in tests. + register.Do(func() { + prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: "mettle", + Name: "requests_current", + }, + func() float64 { + srv.mutex.Lock() + defer srv.mutex.Unlock() + return float64(len(srv.jobs)) + }, + )) + }) + log.Notice("Allowed platform values:") for k, v := range allowedPlatform { log.Notice(" %s: %s", k, strings.Join(v, ", ")) @@ -180,7 +192,6 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, log.Warningf("Failed to get inflight executions: %s", err) } else if len(jobs) > 0 { srv.jobs = jobs - currentRequests.Set(float64(len(srv.jobs))) log.Notice("Updated server with %d inflight executions", len(srv.jobs)) } go srv.Receive() @@ -407,7 +418,6 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin s.jobs[digest.Hash] = j log.Debug("Created job for %s", digest.Hash) totalRequests.Inc() - currentRequests.Inc() created = true } else if create && time.Since(j.LastUpdate) >= resumptionTime { // In this path we think the job is too old to be relevant; we don't actually create @@ -525,7 +535,6 @@ func (s *server) process(msg *pubsub.Message) { LastUpdate: time.Now(), } s.jobs[key] = j - currentRequests.Inc() } if metadata.Stage != pb.ExecutionStage_QUEUED || !j.SentFirst { // Only send QUEUED messages if they're the first one. This prevents us from @@ -573,7 +582,6 @@ func (s *server) periodicallyDeleteJobs() { for digest, job := range s.jobs { if shouldDeleteJob(job) { delete(s.jobs, digest) - currentRequests.Dec() } } s.mutex.Unlock()