diff --git a/ChangeLog b/ChangeLog index 20c9486..3a292df 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.12.0 +--------------- + * Add a new flag for connectivity checks to an external server + Version 11.11.1 --------------- * Add keepalive config to gRPC dial parameters diff --git a/VERSION b/VERSION index ef772f7..a098073 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.11.1 +11.12.0 diff --git a/mettle/BUILD b/mettle/BUILD index 1ed132b..50e63c3 100644 --- a/mettle/BUILD +++ b/mettle/BUILD @@ -16,7 +16,7 @@ go_binary( sh_cmd( name = "run_local", - cmd = f"mkdir -p plz-out/mettle && exec $(out_location :mettle) dual --host {CONFIG.LOCAL_HOST} --port 7778 -s 127.0.0.1:7777 -s file://\\\\$PWD/plz-out/elan -s 127.0.0.1:7772 -d plz-out/mettle --log_file plz-out/log/mettle.log --browser {CONFIG.BROWSER_URL} --sandbox $(out_location //sandbox:sandbox) --alt_sandbox $(out_location //sandbox:alt_sandbox) --admin_host 127.0.0.1 --token_file grpcutil/token.txt --redis_url 127.0.0.1:6379 --redis_password_file redis/auth --lucidity grpc://127.0.0.1:7774 --cache_dir plz-out/mettle-cache --cache_prefix third_party --memory_threshold 95.0 --version_file VERSION --cost cpu:£0.02 --cost mem:£0.01 --allowed_platform OSFamily:linux --allowed_platform OSFamily:macos --allowed_platform ISA:x86-64", + cmd = f"mkdir -p plz-out/mettle && exec $(out_location :mettle) dual --host {CONFIG.LOCAL_HOST} --port 7778 -s 127.0.0.1:7777 -s file://\\\\$PWD/plz-out/elan -s 127.0.0.1:7772 -d plz-out/mettle --log_file plz-out/log/mettle.log --browser {CONFIG.BROWSER_URL} --sandbox $(out_location //sandbox:sandbox) --alt_sandbox $(out_location //sandbox:alt_sandbox) --admin_host 127.0.0.1 --token_file grpcutil/token.txt --redis_url 127.0.0.1:6379 --redis_password_file redis/auth --lucidity grpc://127.0.0.1:7774 --cache_dir plz-out/mettle-cache --cache_prefix third_party --memory_threshold 95.0 --version_file VERSION --cost cpu:£0.02 --cost mem:£0.01 --allowed_platform OSFamily:linux --allowed_platform OSFamily:macos --allowed_platform ISA:x86-64 --connectivity_check gstatic", data = [ ":mettle", "//sandbox", diff --git a/mettle/main.go b/mettle/main.go index a8cbb05..bf474f0 100644 --- a/mettle/main.go +++ b/mettle/main.go @@ -60,6 +60,8 @@ var opts = struct { Timeout flags.Duration `long:"timeout" hidden:"true" description:"Deprecated, has no effect."` MinDiskSpace flags.ByteSize `long:"min_disk_space" default:"1G" description:"Don't accept builds unless at least this much disk space is available"` MemoryThreshold float64 `long:"memory_threshold" default:"100.0" description:"Don't accept builds unless available memory is under this percentage"` + ConnCheck string `long:"connectivity_check" choice:"gstatic" choice:"firefox" description:"Run an HTTP connectivity check periodically to verify if HTTP access is working"` + ConnCheckPeriod flags.Duration `long:"connectivity_check_period" default:"1h" description:"Periodicity to re-check connectivity at"` VersionFile string `long:"version_file" description:"File containing version tag"` Costs map[string]cli.Currency `long:"cost" description:"Per-second costs to associate with each build action."` ImmediateShutdown bool `long:"immediate_shutdown" description:"True if the worker should shut down immediately on a sigterm."` @@ -84,6 +86,8 @@ var opts = struct { Timeout flags.Duration `long:"timeout" hidden:"true" description:"Deprecated, has no effect."` MinDiskSpace flags.ByteSize `long:"min_disk_space" default:"1G" description:"Don't accept builds unless at least this much disk space is available"` MemoryThreshold float64 `long:"memory_threshold" default:"100.0" description:"Don't accept builds unless available memory is under this percentage"` + ConnCheck string `long:"connectivity_check" choice:"gstatic" choice:"firefox" description:"Run an HTTP connectivity check periodically to verify if HTTP access is working"` + ConnCheckPeriod flags.Duration `long:"connectivity_check_period" default:"1h" description:"Periodicity to re-check connectivity at"` VersionFile string `long:"version_file" description:"File containing version tag"` Costs map[string]cli.Currency `long:"cost" description:"Per-second costs to associate with each build action."` Cache CacheOpts `group:"Options controlling caching" namespace:"cache"` @@ -171,12 +175,12 @@ func main() { } for i := 0; i < opts.Dual.NumWorkers; i++ { storage := opts.Dual.Storage.Storage[i%len(opts.Dual.Storage.Storage)] - go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) + go worker.RunForever(opts.InstanceName, requests+"?ackdeadline=10m", responses, fmt.Sprintf("%s-%d", opts.InstanceName, i), storage, opts.Dual.Dir, opts.Dual.Cache.Dir, opts.Dual.Browser, opts.Dual.Sandbox, opts.Dual.AltSandbox, opts.Dual.Lucidity, "", opts.Dual.GRPC.TokenFile, primaryRedis, readRedis, opts.Dual.Redis.MaxSize, opts.Dual.Cache.Prefix, opts.Dual.Cache.Part, !opts.Dual.NoClean, opts.Dual.Storage.TLS, int64(opts.Dual.Cache.MaxMem), int64(opts.Dual.MinDiskSpace), opts.Dual.MemoryThreshold, opts.Dual.ConnCheck, time.Duration(opts.Dual.ConnCheckPeriod), opts.Dual.VersionFile, opts.Dual.Costs, 0, opts.Worker.ImmediateShutdown) } api.ServeForever(opts.Dual.GRPC, "", queues, "", false, opts.Dual.AllowedPlatform, opts.Dual.Storage.Storage[0], opts.Dual.Storage.TLS) } else if cmd == "worker" { primaryRedis, readRedis := opts.Worker.Redis.Clients() - worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) + worker.RunForever(opts.InstanceName, opts.Worker.Queues.RequestQueue, opts.Worker.Queues.ResponseQueue, opts.Worker.Name, opts.Worker.Storage.Storage, opts.Worker.Dir, opts.Worker.Cache.Dir, opts.Worker.Browser, opts.Worker.Sandbox, opts.Worker.AltSandbox, opts.Worker.Lucidity, opts.Worker.PromGateway, opts.Worker.Storage.TokenFile, primaryRedis, readRedis, opts.Worker.Redis.MaxSize, opts.Worker.Cache.Prefix, opts.Worker.Cache.Part, !opts.Worker.NoClean, opts.Worker.Storage.TLS, int64(opts.Worker.Cache.MaxMem), int64(opts.Worker.MinDiskSpace), opts.Worker.MemoryThreshold, opts.Worker.ConnCheck, time.Duration(opts.Worker.ConnCheckPeriod), opts.Worker.VersionFile, opts.Worker.Costs, time.Duration(opts.Worker.Queues.AckExtension), opts.Worker.ImmediateShutdown) } else if cmd == "api" { api.ServeForever(opts.API.GRPC, opts.API.Queues.ResponseQueueSuffix, opts.API.Queues, opts.API.API.URL, opts.API.API.TLS, opts.API.AllowedPlatform, opts.API.Storage.Storage, opts.API.Storage.TLS) } else if err := one(); err != nil { diff --git a/mettle/worker/reporting.go b/mettle/worker/reporting.go index e3c9793..13d3f29 100644 --- a/mettle/worker/reporting.go +++ b/mettle/worker/reporting.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "net/http" "syscall" "time" @@ -167,3 +168,26 @@ func (w *worker) checkLiveConnection() bool { } return true } + +// checkConnectivity tries to check connectivity to an external checker. It dies if it doesn't work. +func (w *worker) checkConnectivity(check string) { + switch check { + case "gstatic": + if resp, err := http.Get("https://connectivitycheck.gstatic.com/generate_204"); err != nil { + log.Fatalf("Failed to complete connectivity check: %s", err) + } else if resp.StatusCode != http.StatusNoContent { + log.Fatalf("Connectivity check returned unexpected status: %s", resp.Status) + } + case "firefox": + if resp, err := http.Get("https://detectportal.firefox.com/canonical.html"); err != nil { + log.Fatalf("Failed to complete connectivity check: %s", err) + } else if resp.StatusCode != http.StatusOK { + log.Fatalf("Connectivity check returned unexpected status: %s", resp.Status) + } + case "": + return // no check + default: + log.Fatalf("unknown connectivity check type: %s", check) + } + log.Notice("%s connectivity check succeeded", check) +} diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 3f1da85..3fb7cf6 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -148,8 +148,8 @@ func init() { } // RunForever runs the worker, receiving jobs until terminated. -func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { - err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension, immediateShutdown) +func RunForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) { + err := runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, connCheck, connCheckPeriod, versionFile, costs, ackExtension, immediateShutdown) log.Fatalf("Failed to run: %s", err) } @@ -184,7 +184,7 @@ func RunOne(instanceName, name, storage, dir, cacheDir, sandbox, altSandbox, tok return nil } -func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { +func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile string, primaryRedis, readRedis *redis.Client, redisMaxSize int64, cachePrefix, cacheParts []string, clean, secureStorage bool, maxCacheSize, minDiskSpace int64, memoryThreshold float64, connCheck string, connCheckPeriod time.Duration, versionFile string, costs map[string]mettlecli.Currency, ackExtension time.Duration, immediateShutdown bool) error { w, err := initialiseWorker(instanceName, requestQueue, responseQueue, name, storage, dir, cacheDir, browserURL, sandbox, altSandbox, lucidity, promGatewayURL, tokenFile, primaryRedis, readRedis, redisMaxSize, cachePrefix, cacheParts, clean, secureStorage, maxCacheSize, minDiskSpace, memoryThreshold, versionFile, costs, ackExtension) if err != nil { return err @@ -210,12 +210,23 @@ func runForever(instanceName, requestQueue, responseQueue, name, storage, dir, c } } }() + w.checkConnectivity(connCheck) go w.periodicallyPushMetrics() defer w.metricTicker.Stop() + t := time.NewTicker(connCheckPeriod) + defer t.Stop() for { w.waitForFreeResources() w.waitForLiveConnection() w.waitIfDisabled() + + // Run the connectivity check if the period has expired + select { + case <-t.C: + w.checkConnectivity(connCheck) + default: + } + // Run an explicit GC to clear up after the last task; ideally we leave as much free as // possible for the subprocesses. runtime.GC()