Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically delete jobs in Mettle api server #281

Merged
merged 8 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.8.4
--------------
* delete in memory jobs periodically in one routine

Version 11.7.4
--------------
* Set limiter before context in elan client
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.7.4
11.8.4
109 changes: 47 additions & 62 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package api
import (
"context"
"fmt"
"math/rand"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -98,6 +97,12 @@ var preResponsePublishDurations = prometheus.NewHistogram(prometheus.HistogramOp
Buckets: prometheus.DefBuckets,
})

var deleteJobsDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "mettle",
Name: "delete_jobs_durations",
Buckets: prometheus.DefBuckets,
})

var metrics = []prometheus.Collector{
totalRequests,
currentRequests,
Expand All @@ -109,6 +114,7 @@ var metrics = []prometheus.Collector{
requestPublishFailure,
responsePublishFailure,
preResponsePublishDurations,
deleteJobsDurations,
}

func init() {
Expand Down Expand Up @@ -152,14 +158,15 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
// The subscription url is made up of the response queue url and the response queue suffix
subscriptionURL := queueOpts.ResponseQueue + queueOpts.ResponseQueueSuffix
srv := &server{
name: name,
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: queueOpts.NumPollers,
name: name,
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: queueOpts.NumPollers,
deleteJobsTicker: time.NewTicker(10 * time.Minute),
}
log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
Expand All @@ -172,11 +179,10 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
srv.jobs = jobs
currentRequests.Set(float64(len(srv.jobs)))
log.Notice("Updated server with %d inflight executions", len(srv.jobs))
for id := range jobs {
go srv.expireJob(id)
}
}
go srv.Receive()
go srv.periodicallyDeleteJobs()
defer srv.deleteJobsTicker.Stop()

lis, s := grpcutil.NewServer(opts)
pb.RegisterCapabilitiesServer(s, srv)
Expand All @@ -187,15 +193,16 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,

type server struct {
bpb.UnimplementedBootstrapServer
name string
client *client.Client
requests *pubsub.Topic
responses *pubsub.Subscription
preResponses *pubsub.Topic
jobs map[string]*job
platform map[string][]string
mutex sync.Mutex
numPollers int
name string
client *client.Client
requests *pubsub.Topic
responses *pubsub.Subscription
preResponses *pubsub.Topic
jobs map[string]*job
platform map[string][]string
mutex sync.Mutex
numPollers int
deleteJobsTicker *time.Ticker
}

// ServeExecutions serves a list of currently executing jobs over GRPC.
Expand Down Expand Up @@ -308,7 +315,6 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
// We didn't create a new execution, so don't need to send a request for a new build.
return s.streamEvents(req.ActionDigest, ch, stream)
}
go s.expireJob(req.ActionDigest.Hash)
// Dispatch a pre-emptive response message to let our colleagues know we've queued it.
// We will also receive & forward this message.
b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil)
Expand Down Expand Up @@ -554,55 +560,34 @@ func (s *server) process(msg *pubsub.Message) {
timeToComplete.Observe(j.LastUpdate.Sub(j.StartTime).Seconds())
}
log.Info("Job %s completed by %s", key, worker)
go s.deleteJob(key, j)
}
}
}

// deleteJob waits for a period then removes the given job from memory.
func (s *server) deleteJob(hash string, j *job) {
time.Sleep(retentionTime + time.Duration(rand.Int63n(int64(retentionTime))))
s.mutex.Lock()
defer s.mutex.Unlock()
// Check the action hasn't been replaced since deleteJob was called
if s.jobs[hash] == j {
log.Notice("Removing job %s", hash)
delete(s.jobs, hash)
currentRequests.Dec()
}
}

// expireJob expires an action that hasn't progressed.
func (s *server) expireJob(hash string) {
time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime))))
if s.maybeExpireJob(hash, false) {
return
func (s *server) periodicallyDeleteJobs() {
for range s.deleteJobsTicker.C {
startTime := time.Now()
s.mutex.Lock()
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
delete(s.jobs, digest)
currentRequests.Dec()
}
}
s.mutex.Unlock()
deleteJobsDurations.Observe(time.Since(startTime).Seconds())
}
time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime))))
s.maybeExpireJob(hash, true)
}

// maybeExpireJob checks a single job and expires it if nobody is waiting for an update,
// or expires it regardless if force=true.
// It returns true if the job was expired.
func (s *server) maybeExpireJob(hash string, force bool) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
if j, present := s.jobs[hash]; !present {
func shouldDeleteJob(j *job) bool {
timeSinceLastUpdate := time.Since(j.LastUpdate)
if j.Done && timeSinceLastUpdate > retentionTime {
return true
} else if len(j.Streams) == 0 {
if j.Done {
log.Debug("Expiring completed job %s", hash)
} else {
log.Warning("Expiring job %s with no listeners", hash)
}
delete(s.jobs, hash)
currentRequests.Dec()
}
if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime {
return true
} else if force {
log.Warning("Force expiring job %s with %d listeners", hash, len(j.Streams))
delete(s.jobs, hash)
currentRequests.Dec()
}
if !j.Done && timeSinceLastUpdate > 2*expiryTime {
return true
}
return false
Expand Down
65 changes: 65 additions & 0 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"testing"
"time"

pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -197,6 +198,70 @@ func TestExecuteAndWaitAfterCompletion(t *testing.T) {
checkExitCode(t, op2, 0)
}

func TestShouldDeleteJob(t *testing.T) {
now := time.Now()
var tests = []struct {
name string
job *job
shouldDelete bool
}{
{
name: "incomplete job returns false",
job: &job{
Done: false,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job within retention time returns false",
job: &job{
Done: true,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job after retention time returns true",
job: &job{
Done: true,
LastUpdate: now.Add(-6 * time.Minute),
},
shouldDelete: true,
},
{
name: "incomplete job with no listeners within expiry time returns false",
job: &job{
Done: false,
LastUpdate: now.Add(-59 * time.Minute),
},
shouldDelete: false,
},
{
name: "incomplete job with no listeners after expiry time returns true",
job: &job{
Done: false,
LastUpdate: now.Add(-61 * time.Minute),
},
shouldDelete: true,
},
{
name: "incomplete job with listeners after 2x expiry time returns true",
job: &job{
Done: false,
LastUpdate: now.Add(-121 * time.Minute),
},
shouldDelete: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job))
})
}
}

func runExecution(t *testing.T, client pb.ExecutionClient, ex *executor, hash string, expectedExitCode int) {
stream, err := client.Execute(context.Background(), &pb.ExecuteRequest{
ActionDigest: &pb.Digest{Hash: hash},
Expand Down
Loading