diff --git a/ChangeLog b/ChangeLog index 7aa31517..1bfa7536 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.8.4 +-------------- + * delete in memory jobs periodically in one routine + Version 11.7.4 -------------- * Set limiter before context in elan client diff --git a/VERSION b/VERSION index 6e009551..53bd22a5 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.7.4 +11.8.4 diff --git a/mettle/api/api.go b/mettle/api/api.go index 8fce6a3a..e4d5aaf4 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -4,7 +4,6 @@ package api import ( "context" "fmt" - "math/rand" "net" "strings" "sync" @@ -98,6 +97,12 @@ var preResponsePublishDurations = prometheus.NewHistogram(prometheus.HistogramOp Buckets: prometheus.DefBuckets, }) +var deleteJobsDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "mettle", + Name: "delete_jobs_durations", + Buckets: prometheus.DefBuckets, +}) + var metrics = []prometheus.Collector{ totalRequests, currentRequests, @@ -109,6 +114,7 @@ var metrics = []prometheus.Collector{ requestPublishFailure, responsePublishFailure, preResponsePublishDurations, + deleteJobsDurations, } func init() { @@ -152,14 +158,15 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, // The subscription url is made up of the response queue url and the response queue suffix subscriptionURL := queueOpts.ResponseQueue + queueOpts.ResponseQueueSuffix srv := &server{ - name: name, - requests: common.MustOpenTopic(queueOpts.RequestQueue), - responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize), - preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue), - jobs: map[string]*job{}, - platform: allowedPlatform, - client: client, - numPollers: queueOpts.NumPollers, + name: name, + requests: common.MustOpenTopic(queueOpts.RequestQueue), + responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize), + preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue), + jobs: map[string]*job{}, + platform: allowedPlatform, + client: client, + numPollers: queueOpts.NumPollers, + deleteJobsTicker: time.NewTicker(10 * time.Minute), } log.Notice("Allowed platform values:") for k, v := range allowedPlatform { @@ -172,11 +179,10 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, srv.jobs = jobs currentRequests.Set(float64(len(srv.jobs))) log.Notice("Updated server with %d inflight executions", len(srv.jobs)) - for id := range jobs { - go srv.expireJob(id) - } } go srv.Receive() + go srv.periodicallyDeleteJobs() + defer srv.deleteJobsTicker.Stop() lis, s := grpcutil.NewServer(opts) pb.RegisterCapabilitiesServer(s, srv) @@ -187,15 +193,16 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string, type server struct { bpb.UnimplementedBootstrapServer - name string - client *client.Client - requests *pubsub.Topic - responses *pubsub.Subscription - preResponses *pubsub.Topic - jobs map[string]*job - platform map[string][]string - mutex sync.Mutex - numPollers int + name string + client *client.Client + requests *pubsub.Topic + responses *pubsub.Subscription + preResponses *pubsub.Topic + jobs map[string]*job + platform map[string][]string + mutex sync.Mutex + numPollers int + deleteJobsTicker *time.Ticker } // ServeExecutions serves a list of currently executing jobs over GRPC. @@ -308,7 +315,6 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ // We didn't create a new execution, so don't need to send a request for a new build. return s.streamEvents(req.ActionDigest, ch, stream) } - go s.expireJob(req.ActionDigest.Hash) // Dispatch a pre-emptive response message to let our colleagues know we've queued it. // We will also receive & forward this message. b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil) @@ -554,55 +560,34 @@ func (s *server) process(msg *pubsub.Message) { timeToComplete.Observe(j.LastUpdate.Sub(j.StartTime).Seconds()) } log.Info("Job %s completed by %s", key, worker) - go s.deleteJob(key, j) } } } -// deleteJob waits for a period then removes the given job from memory. -func (s *server) deleteJob(hash string, j *job) { - time.Sleep(retentionTime + time.Duration(rand.Int63n(int64(retentionTime)))) - s.mutex.Lock() - defer s.mutex.Unlock() - // Check the action hasn't been replaced since deleteJob was called - if s.jobs[hash] == j { - log.Notice("Removing job %s", hash) - delete(s.jobs, hash) - currentRequests.Dec() - } -} - -// expireJob expires an action that hasn't progressed. -func (s *server) expireJob(hash string) { - time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime)))) - if s.maybeExpireJob(hash, false) { - return +func (s *server) periodicallyDeleteJobs() { + for range s.deleteJobsTicker.C { + startTime := time.Now() + s.mutex.Lock() + for digest, job := range s.jobs { + if shouldDeleteJob(job) { + delete(s.jobs, digest) + currentRequests.Dec() + } + } + s.mutex.Unlock() + deleteJobsDurations.Observe(time.Since(startTime).Seconds()) } - time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime)))) - s.maybeExpireJob(hash, true) } -// maybeExpireJob checks a single job and expires it if nobody is waiting for an update, -// or expires it regardless if force=true. -// It returns true if the job was expired. -func (s *server) maybeExpireJob(hash string, force bool) bool { - s.mutex.Lock() - defer s.mutex.Unlock() - if j, present := s.jobs[hash]; !present { +func shouldDeleteJob(j *job) bool { + timeSinceLastUpdate := time.Since(j.LastUpdate) + if j.Done && timeSinceLastUpdate > retentionTime { return true - } else if len(j.Streams) == 0 { - if j.Done { - log.Debug("Expiring completed job %s", hash) - } else { - log.Warning("Expiring job %s with no listeners", hash) - } - delete(s.jobs, hash) - currentRequests.Dec() + } + if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime { return true - } else if force { - log.Warning("Force expiring job %s with %d listeners", hash, len(j.Streams)) - delete(s.jobs, hash) - currentRequests.Dec() + } + if !j.Done && timeSinceLastUpdate > 2*expiryTime { return true } return false diff --git a/mettle/api/api_test.go b/mettle/api/api_test.go index a5936dfb..ef9f90ec 100644 --- a/mettle/api/api_test.go +++ b/mettle/api/api_test.go @@ -6,6 +6,7 @@ import ( "io" "os" "testing" + "time" pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/golang/protobuf/proto" @@ -197,6 +198,70 @@ func TestExecuteAndWaitAfterCompletion(t *testing.T) { checkExitCode(t, op2, 0) } +func TestShouldDeleteJob(t *testing.T) { + now := time.Now() + var tests = []struct { + name string + job *job + shouldDelete bool + }{ + { + name: "incomplete job returns false", + job: &job{ + Done: false, + LastUpdate: now.Add(-1 * time.Minute), + }, + shouldDelete: false, + }, + { + name: "completed job within retention time returns false", + job: &job{ + Done: true, + LastUpdate: now.Add(-1 * time.Minute), + }, + shouldDelete: false, + }, + { + name: "completed job after retention time returns true", + job: &job{ + Done: true, + LastUpdate: now.Add(-6 * time.Minute), + }, + shouldDelete: true, + }, + { + name: "incomplete job with no listeners within expiry time returns false", + job: &job{ + Done: false, + LastUpdate: now.Add(-59 * time.Minute), + }, + shouldDelete: false, + }, + { + name: "incomplete job with no listeners after expiry time returns true", + job: &job{ + Done: false, + LastUpdate: now.Add(-61 * time.Minute), + }, + shouldDelete: true, + }, + { + name: "incomplete job with listeners after 2x expiry time returns true", + job: &job{ + Done: false, + LastUpdate: now.Add(-121 * time.Minute), + }, + shouldDelete: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job)) + }) + } +} + func runExecution(t *testing.T, client pb.ExecutionClient, ex *executor, hash string, expectedExitCode int) { stream, err := client.Execute(context.Background(), &pb.ExecuteRequest{ ActionDigest: &pb.Digest{Hash: hash},