diff --git a/ChangeLog b/ChangeLog index 3a292df..714ed5f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.13.0 +--------------- + * Add a pre-flight action to ensure workers are succesfully able to execute actions (#325) + Version 11.12.0 --------------- * Add a new flag for connectivity checks to an external server diff --git a/VERSION b/VERSION index a098073..a03dc57 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.12.0 +11.13.0 diff --git a/mettle/main.go b/mettle/main.go index 4e958dc..0539753 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -4,6 +4,7 @@ package main import ( "fmt" "os" + "path/filepath" "runtime" "runtime/pprof" "time" @@ -51,6 +52,7 @@ var opts = struct { Worker struct { Dir string `short:"d" long:"dir" default:"." description:"Directory to run actions in"` NoClean bool `long:"noclean" description:"Don't clean workdirs after actions complete"` + PreflightAction bool `long:"preflight_action" description:"Run a pre-flight test action before starting to process work"` Name string `short:"n" long:"name" description:"Name of this worker"` Browser string `long:"browser" env:"BROWSER_URL" description:"Base URL for browser service (only used to construct informational user messages"` Lucidity string `long:"lucidity" description:"URL of Lucidity server to report to"` @@ -76,8 +78,9 @@ var opts = struct { } `command:"worker" description:"Start as a worker"` Dual struct { GRPC grpcutil.Opts `group:"Options controlling the gRPC server"` - Dir string `short:"d" long:"dir" default:"plz-out/mettle" description:"Directory to run actions in"` + Dir string `short:"d" long:"dir" default:"plz-out/mettle" description:"Base directory to run actions in"` NoClean bool `long:"noclean" env:"METTLE_NO_CLEAN" description:"Don't clean workdirs after actions complete"` + PreflightAction bool `long:"preflight_action" description:"Run a pre-flight test action before starting to process work"` NumWorkers int `short:"n" long:"num_workers" env:"METTLE_NUM_WORKERS" description:"Number of workers to run in parallel"` Browser string `long:"browser" description:"Base URL for browser service (only used to construct informational user messages"` Lucidity string `long:"lucidity" description:"URL of Lucidity server to report to"` @@ -175,12 +178,13 @@ func main() { } for i := 0; i < opts.Dual.NumWorkers; i++ { storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)] - go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) + dir := filepath.Join(opts.Dual.Dir, fmt.Sprintf("worker_%02d", i)) + go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.PreflightAction, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) } api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS) } else if cmd == "worker" { primaryRedis, readRedis := opts.Worker.Redis.Clients() - worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) + worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.PreflightAction, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) } else if cmd == "api" { api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS) } else if err := one(); err != nil { diff --git a/mettle/worker/preflight.go b/mettle/worker/preflight.go new file mode 100644 index 0000000..c219e22 --- /dev/null +++ b/mettle/worker/preflight.go @@ -0,0 +1,85 @@ +package worker + +import ( + "fmt" + "time" + + "github.com/bazelbuild/remote-apis-sdks/go/pkg/digest" + "github.com/bazelbuild/remote-apis-sdks/go/pkg/uploadinfo" + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" +) + +// runPreflightAction runs a simple known action to ensure we can process them correctly. +func (w *worker) runPreflightAction() error { + log.Notice("Preparing pre-flight action...") + const fileContents = "thirty-five ham and cheese sandwiches\n" + cmd := &pb.Command{ + Arguments: []string{ + "bash", "--noprofile", "--norc", "-c", "cp $SRCS $OUTS", + }, + EnvironmentVariables: []*pb.Command_EnvironmentVariable{ + {Name: "SRCS", Value: "in.txt"}, + {Name: "OUTS", Value: "out.txt"}, + }, + OutputPaths: []string{ + "out.txt", + }, + } + input := &pb.Directory{ + Files: []*pb.FileNode{ + { + Name: "in.txt", + Digest: digest.NewFromBlob([]byte(fileContents)).ToProto(), + }, + }, + } + action := &pb.Action{ + CommandDigest: mustNewDigestFromMessage(cmd), + InputRootDigest: mustNewDigestFromMessage(input), + DoNotCache: false, // We don't set DoNotCache to make sure we can write a request to the server + Timeout: durationpb.New(10 * time.Second), + } + req := &pb.ExecuteRequest{ + SkipCacheLookup: true, + ActionDigest: mustNewDigestFromMessage(action), + } + if err := w.client.UploadIfMissing([]*uploadinfo.Entry{ + uploadinfo.EntryFromBlob([]byte(fileContents)), + mustEntryFromProto(cmd), + mustEntryFromProto(input), + mustEntryFromProto(action), + }, []pb.Compressor_Value{ + pb.Compressor_IDENTITY, + pb.Compressor_IDENTITY, + pb.Compressor_IDENTITY, + pb.Compressor_IDENTITY, + }); err != nil { + return err + } + log.Notice("Running pre-flight action...") + if resp := w.runTaskRequest(req); resp.Status.Code != 0 { + return fmt.Errorf("Failed to run pre-flight request: %s %s", resp.Status, resp.Message) + } else if resp.Result.ExitCode != 0 { + return fmt.Errorf("Failed to run pre-flight request: exit code %d: %s", resp.Result.ExitCode, resp.Message) + } + log.Notice("Pre-flight action run successfully!") + return nil +} + +func mustEntryFromProto(msg proto.Message) *uploadinfo.Entry { + entry, err := uploadinfo.EntryFromProto(msg) + if err != nil { + panic(err) + } + return entry +} + +func mustNewDigestFromMessage(msg proto.Message) *pb.Digest { + dg, err := digest.NewFromMessage(msg) + if err != nil { + panic(err) + } + return dg.ToProto() +} diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index f88130c..6707105 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -148,8 +148,8 @@ func init() { } // RunForever runs the worker, receiving jobs until terminated. -func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { - err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown) +func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { + err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, preflightAction, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown) log.Fatalf("Failed to run: %s", err) } @@ -184,12 +184,19 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok return nil } -func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { +func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, preflightAction, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension) if err != nil { return err } defer w.ShutdownQueues() + + if preflightAction { + if err := w.runPreflightAction(); err != nil { + return err + } + } + ch := make(chan os.Signal, 2) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGABRT, syscall.SIGTERM) ctx, cancel := context.WithCancel(context.Background()) @@ -488,14 +495,6 @@ func (w *worker) runTask(msg *pubsub.Message) *pb.ExecuteResponse { defer cancel() go w.extendAckDeadline(ctx, msg) } - totalBuilds.Inc() - currentBuilds.WithLabelValues(w.instanceName).Inc() - defer currentBuilds.WithLabelValues(w.instanceName).Dec() - w.metadata = &pb.ExecutedActionMetadata{ - Worker: w.name, - WorkerStartTimestamp: ptypes.TimestampNow(), - } - w.taskStartTime = time.Now() req := &pb.ExecuteRequest{} if err := proto.Unmarshal(msg.Body, req); err != nil { log.Error("Badly serialised request: %s") @@ -504,6 +503,19 @@ func (w *worker) runTask(msg *pubsub.Message) *pb.ExecuteResponse { Status: status(err, codes.FailedPrecondition, "Badly serialised request: %s", err), } } + return w.runTaskRequest(req) +} + +// runTaskRequest runs a task from a proto request +func (w *worker) runTaskRequest(req *pb.ExecuteRequest) *pb.ExecuteResponse { + totalBuilds.Inc() + currentBuilds.WithLabelValues(w.instanceName).Inc() + defer currentBuilds.WithLabelValues(w.instanceName).Dec() + w.metadata = &pb.ExecutedActionMetadata{ + Worker: w.name, + WorkerStartTimestamp: ptypes.TimestampNow(), + } + w.taskStartTime = time.Now() w.actionDigest = req.ActionDigest log.Debug("Received task for action digest %s", w.actionDigest.Hash) w.lastURL = w.actionURL()