Skip to content

Commit

Permalink
periodically delete jobs in Mettle api server
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamishpk committed Feb 20, 2024
1 parent bfd2910 commit 1cebaac
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 62 deletions.
100 changes: 38 additions & 62 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package api
import (
"context"
"fmt"
"math/rand"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -152,14 +151,15 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
// The subscription url is made up of the response queue url and the response queue suffix
subscriptionURL := queueOpts.ResponseQueue + queueOpts.ResponseQueueSuffix
srv := &server{
name: name,
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: queueOpts.NumPollers,
name: name,
requests: common.MustOpenTopic(queueOpts.RequestQueue),
responses: common.MustOpenSubscription(subscriptionURL, queueOpts.SubscriptionBatchSize),
preResponses: common.MustOpenTopic(queueOpts.PreResponseQueue),
jobs: map[string]*job{},
platform: allowedPlatform,
client: client,
numPollers: queueOpts.NumPollers,
deleteJobsTicker: time.NewTicker(10 * time.Minute),
}
log.Notice("Allowed platform values:")
for k, v := range allowedPlatform {
Expand All @@ -172,11 +172,10 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,
srv.jobs = jobs
currentRequests.Set(float64(len(srv.jobs)))
log.Notice("Updated server with %d inflight executions", len(srv.jobs))
for id := range jobs {
go srv.expireJob(id)
}
}
go srv.Receive()
go srv.periodicallyDeleteJobs()
defer srv.deleteJobsTicker.Stop()

lis, s := grpcutil.NewServer(opts)
pb.RegisterCapabilitiesServer(s, srv)
Expand All @@ -187,15 +186,16 @@ func serve(opts grpcutil.Opts, name string, queueOpts PubSubOpts, apiURL string,

type server struct {
bpb.UnimplementedBootstrapServer
name string
client *client.Client
requests *pubsub.Topic
responses *pubsub.Subscription
preResponses *pubsub.Topic
jobs map[string]*job
platform map[string][]string
mutex sync.Mutex
numPollers int
name string
client *client.Client
requests *pubsub.Topic
responses *pubsub.Subscription
preResponses *pubsub.Topic
jobs map[string]*job
platform map[string][]string
mutex sync.Mutex
numPollers int
deleteJobsTicker *time.Ticker
}

// ServeExecutions serves a list of currently executing jobs over GRPC.
Expand Down Expand Up @@ -308,7 +308,6 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
// We didn't create a new execution, so don't need to send a request for a new build.
return s.streamEvents(req.ActionDigest, ch, stream)
}
go s.expireJob(req.ActionDigest.Hash)
// Dispatch a pre-emptive response message to let our colleagues know we've queued it.
// We will also receive & forward this message.
b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil)
Expand Down Expand Up @@ -554,55 +553,32 @@ func (s *server) process(msg *pubsub.Message) {
timeToComplete.Observe(j.LastUpdate.Sub(j.StartTime).Seconds())
}
log.Info("Job %s completed by %s", key, worker)
go s.deleteJob(key, j)
}
}
}

// deleteJob waits for a period then removes the given job from memory.
func (s *server) deleteJob(hash string, j *job) {
time.Sleep(retentionTime + time.Duration(rand.Int63n(int64(retentionTime))))
s.mutex.Lock()
defer s.mutex.Unlock()
// Check the action hasn't been replaced since deleteJob was called
if s.jobs[hash] == j {
log.Notice("Removing job %s", hash)
delete(s.jobs, hash)
currentRequests.Dec()
}
}

// expireJob expires an action that hasn't progressed.
func (s *server) expireJob(hash string) {
time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime))))
if s.maybeExpireJob(hash, false) {
return
func (s *server) periodicallyDeleteJobs() {
for range s.deleteJobsTicker.C {
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
s.mutex.Lock()
defer s.mutex.Unlock()
delete(s.jobs, digest)
currentRequests.Dec()
}
}
}
time.Sleep(expiryTime + time.Duration(rand.Int63n(int64(expiryTime))))
s.maybeExpireJob(hash, true)
}

// maybeExpireJob checks a single job and expires it if nobody is waiting for an update,
// or expires it regardless if force=true.
// It returns true if the job was expired.
func (s *server) maybeExpireJob(hash string, force bool) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
if j, present := s.jobs[hash]; !present {
func shouldDeleteJob(j *job) bool {
timeSinceLastUpdate := time.Since(j.LastUpdate)
if j.Done && timeSinceLastUpdate > retentionTime {
return true
} else if len(j.Streams) == 0 {
if j.Done {
log.Debug("Expiring completed job %s", hash)
} else {
log.Warning("Expiring job %s with no listeners", hash)
}
delete(s.jobs, hash)
currentRequests.Dec()
}
if !j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > expiryTime {
return true
} else if force {
log.Warning("Force expiring job %s with %d listeners", hash, len(j.Streams))
delete(s.jobs, hash)
currentRequests.Dec()
}
if !j.Done && timeSinceLastUpdate > 2*expiryTime {
return true
}
return false
Expand Down
65 changes: 65 additions & 0 deletions mettle/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"os"
"testing"
"time"

pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -197,6 +198,70 @@ func TestExecuteAndWaitAfterCompletion(t *testing.T) {
checkExitCode(t, op2, 0)
}

func TestShouldDeleteJob(t *testing.T) {
now := time.Now()
var tests = []struct {
name string
job *job
shouldDelete bool
}{
{
name: "incomplete job returns false",
job: &job{
Done: false,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job within retention time returns false",
job: &job{
Done: true,
LastUpdate: now.Add(-1 * time.Minute),
},
shouldDelete: false,
},
{
name: "completed job after retention time returns true",
job: &job{
Done: true,
LastUpdate: now.Add(-6 * time.Minute),
},
shouldDelete: true,
},
{
name: "incomplete job with no listeners within expiry time returns false",
job: &job{
Done: false,
LastUpdate: now.Add(-59 * time.Minute),
},
shouldDelete: false,
},
{
name: "incomplete job with no listeners after expiry time returns true",
job: &job{
Done: false,
LastUpdate: now.Add(-61 * time.Minute),
},
shouldDelete: true,
},
{
name: "incomplete job with listeners after 2x expiry time returns true",
job: &job{
Done: false,
LastUpdate: now.Add(-121 * time.Minute),
},
shouldDelete: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.shouldDelete, shouldDeleteJob(test.job))
})
}
}

func runExecution(t *testing.T, client pb.ExecutionClient, ex *executor, hash string, expectedExitCode int) {
stream, err := client.Execute(context.Background(), &pb.ExecuteRequest{
ActionDigest: &pb.Digest{Hash: hash},
Expand Down

0 comments on commit 1cebaac

Please sign in to comment.