Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor gauge strategy #284

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ var totalRequests = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Name: "requests_total",
})
var currentRequests = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "mettle",
Name: "requests_current",
})

var totalFailedActions = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "mettle",
Expand Down Expand Up @@ -105,7 +101,6 @@ var deleteJobsDurations = prometheus.NewHistogram(prometheus.HistogramOpts{

var metrics = []prometheus.Collector{
totalRequests,
currentRequests,
totalFailedActions,
totalSuccessfulActions,
timeToComplete,
Expand All @@ -117,6 +112,8 @@ var metrics = []prometheus.Collector{
deleteJobsDurations,
}

var register sync.Once

func init() {
for _, metric := range metrics {
prometheus.MustRegister(metric)
Expand Down Expand Up @@ -168,6 +165,21 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
numPollers: queueOpts.NumPollers,
deleteJobsTicker: time.NewTicker(10 * time.Minute),
}

// _Technically_ this won't happen more than once in normal running, as we'd only run 1 server, but it does happen in tests.
register.Do(func() {
prometheus.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "mettle",
Name: "requests_current",
},
func() float64 {
srv.mutex.Lock()
defer srv.mutex.Unlock()
return float64(len(srv.jobs))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could introduce a concurrent map read / modification - if the main logic is updating it at the same time as Prometheus tries to scrape it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh. Yeah, brain did not engage on that one.

},
))
})

log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
log.Notice(" %s: %s", k, strings.Join(v, ", "))
Expand All @@ -177,7 +189,6 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
log.Warningf("Failed to get inflight executions: %s", err)
} else if len(jobs) > 0 {
srv.jobs = jobs
currentRequests.Set(float64(len(srv.jobs)))
log.Notice("Updated server with %d inflight executions", len(srv.jobs))
}
go srv.Receive()
Expand Down Expand Up @@ -404,7 +415,6 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
s.jobs[digest.Hash] = j
log.Debug("Created job for %s", digest.Hash)
totalRequests.Inc()
currentRequests.Inc()
created = true
} else if create && time.Since(j.LastUpdate) >= resumptionTime {
// In this path we think the job is too old to be relevant; we don't actually create
Expand Down Expand Up @@ -522,7 +532,6 @@ func (s *server) process(msg *pubsub.Message) {
LastUpdate: time.Now(),
}
s.jobs[key] = j
currentRequests.Inc()
}
if metadata.Stage != pb.ExecutionStage_QUEUED || !j.SentFirst {
// Only send QUEUED messages if they're the first one. This prevents us from
Expand Down Expand Up @@ -570,7 +579,6 @@ func (s *server) periodicallyDeleteJobs() {
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
delete(s.jobs, digest)
currentRequests.Dec()
}
}
s.mutex.Unlock()
Expand Down
Loading