Skip to content

Commit

Permalink
Add a pre-flight action to ensure workers are able to execute actions (
Browse files Browse the repository at this point in the history
…#325)

* Add pre-flight action

* workers in subdirs so they don't fight

* remove

* Version
  • Loading branch information
peterebden authored Nov 25, 2024
1 parent 218f36d commit 2fcec46
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 15 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.13.0
---------------
* Add a pre-flight action to ensure workers are succesfully able to execute actions (#325)

Version 11.12.0
---------------
* Add a new flag for connectivity checks to an external server
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.12.0
11.13.0
10 changes: 7 additions & 3 deletions mettle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package main
import (
"fmt"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"
Expand Down Expand Up @@ -51,6 +52,7 @@ var opts = struct {
Worker struct {
Dir string `short:"d" long:"dir" default:"." description:"Directory to run actions in"`
NoClean bool `long:"noclean" description:"Don't clean workdirs after actions complete"`
PreflightAction bool `long:"preflight_action" description:"Run a pre-flight test action before starting to process work"`
Name string `short:"n" long:"name" description:"Name of this worker"`
Browser string `long:"browser" env:"BROWSER_URL" description:"Base URL for browser service (only used to construct informational user messages"`
Lucidity string `long:"lucidity" description:"URL of Lucidity server to report to"`
Expand All @@ -76,8 +78,9 @@ var opts = struct {
} `command:"worker" description:"Start as a worker"`
Dual struct {
GRPC grpcutil.Opts `group:"Options controlling the gRPC server"`
Dir string `short:"d" long:"dir" default:"plz-out/mettle" description:"Directory to run actions in"`
Dir string `short:"d" long:"dir" default:"plz-out/mettle" description:"Base directory to run actions in"`
NoClean bool `long:"noclean" env:"METTLE_NO_CLEAN" description:"Don't clean workdirs after actions complete"`
PreflightAction bool `long:"preflight_action" description:"Run a pre-flight test action before starting to process work"`
NumWorkers int `short:"n" long:"num_workers" env:"METTLE_NUM_WORKERS" description:"Number of workers to run in parallel"`
Browser string `long:"browser" description:"Base URL for browser service (only used to construct informational user messages"`
Lucidity string `long:"lucidity" description:"URL of Lucidity server to report to"`
Expand Down Expand Up @@ -175,12 +178,13 @@ func main() {
}
for i := 0; i < opts.Dual.NumWorkers; i++ {
storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)]
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
dir := filepath.Join(opts.Dual.Dir, fmt.Sprintf("worker_%02d", i))
go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.PreflightAction, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown)
}
api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS)
} else if cmd == "worker" {
primaryRedis, readRedis := opts.Worker.Redis.Clients()
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.PreflightAction, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown)
} else if cmd == "api" {
api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS)
} else if err := one(); err != nil {
Expand Down
85 changes: 85 additions & 0 deletions mettle/worker/preflight.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package worker

import (
"fmt"
"time"

"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo"
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)

// runPreflightAction runs a simple known action to ensure we can process them correctly.
func (w *worker) runPreflightAction() error {
log.Notice("Preparing pre-flight action...")
const fileContents = "thirty-five ham and cheese sandwiches\n"
cmd := &pb.Command{
Arguments: []string{
"bash", "--noprofile", "--norc", "-c", "cp $SRCS $OUTS",
},
EnvironmentVariables: []*pb.Command_EnvironmentVariable{
{Name: "SRCS", Value: "in.txt"},
{Name: "OUTS", Value: "out.txt"},
},
OutputPaths: []string{
"out.txt",
},
}
input := &pb.Directory{
Files: []*pb.FileNode{
{
Name: "in.txt",
Digest: digest.NewFromBlob([]byte(fileContents)).ToProto(),
},
},
}
action := &pb.Action{
CommandDigest: mustNewDigestFromMessage(cmd),
InputRootDigest: mustNewDigestFromMessage(input),
DoNotCache: false, // We don't set DoNotCache to make sure we can write a request to the server
Timeout: durationpb.New(10 * time.Second),
}
req := &pb.ExecuteRequest{
SkipCacheLookup: true,
ActionDigest: mustNewDigestFromMessage(action),
}
if err := w.client.UploadIfMissing([]*uploadinfo.Entry{
uploadinfo.EntryFromBlob([]byte(fileContents)),
mustEntryFromProto(cmd),
mustEntryFromProto(input),
mustEntryFromProto(action),
}, []pb.Compressor_Value{
pb.Compressor_IDENTITY,
pb.Compressor_IDENTITY,
pb.Compressor_IDENTITY,
pb.Compressor_IDENTITY,
}); err != nil {
return err
}
log.Notice("Running pre-flight action...")
if resp := w.runTaskRequest(req); resp.Status.Code != 0 {
return fmt.Errorf("Failed to run pre-flight request: %s %s", resp.Status, resp.Message)
} else if resp.Result.ExitCode != 0 {
return fmt.Errorf("Failed to run pre-flight request: exit code %d: %s", resp.Result.ExitCode, resp.Message)
}
log.Notice("Pre-flight action run successfully!")
return nil
}

func mustEntryFromProto(msg proto.Message) *uploadinfo.Entry {
entry, err := uploadinfo.EntryFromProto(msg)
if err != nil {
panic(err)
}
return entry
}

func mustNewDigestFromMessage(msg proto.Message) *pb.Digest {
dg, err := digest.NewFromMessage(msg)
if err != nil {
panic(err)
}
return dg.ToProto()
}
34 changes: 23 additions & 11 deletions mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func init() {
}

// RunForever runs the worker, receiving jobs until terminated.
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown)
func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) {
err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, preflightAction, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown)
log.Fatalf("Failed to run: %s", err)
}

Expand Down Expand Up @@ -184,12 +184,19 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok
return nil
}

func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error {
w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension)
if err != nil {
return err
}
defer w.ShutdownQueues()

if preflightAction {
if err := w.runPreflightAction(); err != nil {
return err
}
}

ch := make(chan os.Signal, 2)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -488,14 +495,6 @@ func (w *worker) runTask(msg *pubsub.Message) *pb.ExecuteResponse {
defer cancel()
go w.extendAckDeadline(ctx, msg)
}
totalBuilds.Inc()
currentBuilds.WithLabelValues(w.instanceName).Inc()
defer currentBuilds.WithLabelValues(w.instanceName).Dec()
w.metadata = &pb.ExecutedActionMetadata{
Worker: w.name,
WorkerStartTimestamp: ptypes.TimestampNow(),
}
w.taskStartTime = time.Now()
req := &pb.ExecuteRequest{}
if err := proto.Unmarshal(msg.Body, req); err != nil {
log.Error("Badly serialised request: %s")
Expand All @@ -504,6 +503,19 @@ func (w *worker) runTask(msg *pubsub.Message) *pb.ExecuteResponse {
Status: status(err, codes.FailedPrecondition, "Badly serialised request: %s", err),
}
}
return w.runTaskRequest(req)
}

// runTaskRequest runs a task from a proto request
func (w *worker) runTaskRequest(req *pb.ExecuteRequest) *pb.ExecuteResponse {
totalBuilds.Inc()
currentBuilds.WithLabelValues(w.instanceName).Inc()
defer currentBuilds.WithLabelValues(w.instanceName).Dec()
w.metadata = &pb.ExecutedActionMetadata{
Worker: w.name,
WorkerStartTimestamp: ptypes.TimestampNow(),
}
w.taskStartTime = time.Now()
w.actionDigest = req.ActionDigest
log.Debug("Received task for action digest %s", w.actionDigest.Hash)
w.lastURL = w.actionURL()
Expand Down

0 comments on commit 2fcec46

Please sign in to comment.