From 01bc3c81335c0dbdaef43169876c8036ce57aa82 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Sat, 27 Jul 2024 20:52:21 +0100 Subject: [PATCH] feat: shutdown race resilience A significant rewrite to ensure that we don't suffer from shutdown race conditions as the prune condition is met and additional resources are being created. Previously this would remove resources that were still in use, now we retry if we detect new resources have been created within a window of the prune condition triggering. This supports the following new environment configuration settings: - RYUK_REMOVE_RETRIES - The number of times to retry removing a resource. - RYUK_REQUEST_TIMEOUT - The timeout for any Docker requests. - RYUK_RETRY_OFFSET - The offset added to the start time of the prune pass that is used as the minimum resource creation time. - RYUK_SHUTDOWN_TIMEOUT - The duration after shutdown has been requested when the remaining connections are ignored and prune checks start. Update README to correct example, as health is only valid for containers not the other resources, so would cause failures. Enable race detection on CI tests. --- .github/workflows/build-docker-image.yml | 2 +- .gitignore | 6 + README.md | 89 ++- config.go | 14 + config_test.go | 12 + consts.go | 18 + go.mod | 55 +- go.sum | 150 ++--- interfaces.go | 25 + main.go | 285 +-------- main_test.go | 314 ---------- mock_test.go | 67 ++ reaper.go | 673 ++++++++++++++++++++ reaper_test.go | 753 +++++++++++++++++++++++ testdata/Dockerfile | 1 + 15 files changed, 1722 insertions(+), 742 deletions(-) create mode 100644 consts.go create mode 100644 interfaces.go delete mode 100644 main_test.go create mode 100644 mock_test.go create mode 100644 reaper.go create mode 100644 reaper_test.go create mode 100644 testdata/Dockerfile diff --git a/.github/workflows/build-docker-image.yml b/.github/workflows/build-docker-image.yml index 18f483f..b00959b 100644 --- a/.github/workflows/build-docker-image.yml +++ b/.github/workflows/build-docker-image.yml @@ -30,7 +30,7 @@ jobs: run: go build - name: go-test - run: go test -v ./... + run: go test -race -v ./... test-windows: runs-on: windows-2022 diff --git a/.gitignore b/.gitignore index dce3433..2f6d5c9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,9 @@ vendor/ bin/ + +# Binary +moby-ryuk + +# VS Code +.vscode diff --git a/README.md b/README.md index 306d021..909b41b 100644 --- a/README.md +++ b/README.md @@ -1,43 +1,80 @@ # Moby Ryuk -This project helps you to remove containers/networks/volumes/images by given filter after specified delay. +This project helps you to remove containers, networks, volumes and images by given filter after specified delay. -# Usage +## Building -1. Start it: +To build the binary only run: - $ RYUK_PORT=8080 ./bin/moby-ryuk - $ # You can also run it with Docker - $ docker run -v /var/run/docker.sock:/var/run/docker.sock -e RYUK_PORT=8080 -p 8080:8080 testcontainers/ryuk:0.9.0 +```shell +go build +``` -1. Connect via TCP: +To build the Linux docker container as the latest tag: - $ nc localhost 8080 +```shell +docker build -f linux/Dockerfile -t testcontainers/ryuk:latest . +``` -1. Send some filters: +## Usage - label=testing=true&health=unhealthy - ACK - label=something - ACK +To start it using the default settings: -1. Close the connection +```shell +docker run -v /var/run/docker.sock:/var/run/docker.sock -p 8080:8080 testcontainers/ryuk:latest +``` -1. Send more filters with "one-off" style: +If you want to test local changes with the default settings: - printf "label=something_else" | nc localhost 8080 +```shell +go run . +``` -1. See containers/networks/volumes deleted after 10s: +You can then simulate a connection from testcontainer container using: - 2018/01/15 18:38:52 Timed out waiting for connection - 2018/01/15 18:38:52 Deleting {"label":{"something":true}} - 2018/01/15 18:38:52 Deleting {"label":{"something_else":true}} - 2018/01/15 18:38:52 Deleting {"health":{"unhealthy":true},"label":{"testing=true":true}} - 2018/01/15 18:38:52 Removed 1 container(s), 0 network(s), 0 volume(s), 0 image(s) +```shell +nc -N localhost 8080 << EOF +label=testing=true&label=testing.sessionid=mysession +label=something +EOF +``` + +You can send additional session information for monitoring using: + +```shell +printf "label=something_else" | nc -N localhost 8080 +``` + +In the ryuk window you'll see containers/networks/volumes deleted after 10s + +```log +time=2024-09-30T19:42:30.000+01:00 level=INFO msg=starting connection_timeout=1m0s reconnection_timeout=10s request_timeout=10s shutdown_timeout=10m0s remove_retries=10 retry_offset=-1s port=8080 verbose=false +time=2024-09-30T19:42:30.001+01:00 level=INFO msg=listening address=[::]:8080 +time=2024-09-30T19:42:30.001+01:00 level=INFO msg="client processing started" +time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client connected" address=127.0.0.1:56432 clients=1 +time=2024-09-30T19:42:38.002+01:00 level=INFO msg="adding filter" type=label values="[testing=true testing.sessionid=mysession]" +time=2024-09-30T19:42:38.002+01:00 level=INFO msg="adding filter" type=label values=[something] +time=2024-09-30T19:42:38.002+01:00 level=INFO msg="client disconnected" address=127.0.0.1:56432 clients=0 +time=2024-09-30T19:42:42.047+01:00 level=INFO msg="adding filter" type=label values=[something_else] +time=2024-09-30T19:42:42.047+01:00 level=INFO msg="client connected" address=127.0.0.1:56434 clients=1 +time=2024-09-30T19:42:42.047+01:00 level=INFO msg="client disconnected" address=127.0.0.1:56434 clients=0 +time=2024-09-30T19:42:52.051+01:00 level=INFO msg="prune check" clients=0 +time=2024-09-30T19:42:52.216+01:00 level=INFO msg="client processing stopped" +time=2024-09-30T19:42:52.216+01:00 level=INFO msg=removed containers=0 networks=0 volumes=0 images=0 +time=2024-09-30T19:42:52.216+01:00 level=INFO msg=done +``` ## Ryuk configuration -- `RYUK_CONNECTION_TIMEOUT` - Environment variable that defines the timeout for Ryuk to receive the first connection (default: 60s). Value layout is described in [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) documentation. -- `RYUK_PORT` - Environment variable that defines the port where Ryuk will be bound to (default: 8080). -- `RYUK_RECONNECTION_TIMEOUT` - Environment variable that defines the timeout for Ryuk to reconnect to Docker (default: 10s). Value layout is described in [time.ParseDuration](https://golang.org/pkg/time/#ParseDuration) documentation. -- `RYUK_VERBOSE` - Environment variable that defines if Ryuk should print debug logs (default: false). +The following environment variables can be configured to change the behaviour: + +| Environment Variable | Default | Format | Description | +| --------------------------- | ------- | ------- | ------------ | +| `RYUK_CONNECTION_TIMEOUT` | `60s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration without receiving any connections which will trigger a shutdown | +| `RYUK_PORT` | `8080` | `uint16` | The port to listen on for connections | +| `RYUK_RECONNECTION_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after the last connection closes which will trigger resource clean up and shutdown | +| `RYUK_REQUEST_TIMEOUT` | `10s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The timeout for any Docker requests | +| `RYUK_REMOVE_RETRIES` | `10` | `int` | The number of times to retry removing a resource | +| `RYUK_RETRY_OFFSET` | `-1s` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The offset added to the start time of the prune pass that is used as the minimum resource creation time. Any resource created after this calculated time will trigger a retry to ensure in use resources are not removed | +| `RYUK_VERBOSE` | `false` | `bool` | Whether to enable verbose aka debug logging | +| `RYUK_SHUTDOWN_TIMEOUT` | `10m` | [Duration](https://golang.org/pkg/time/#ParseDuration) | The duration after shutdown has been requested when the remaining connections are ignored and prune checks start | diff --git a/config.go b/config.go index 2ae1556..d594747 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,17 @@ type config struct { // resource clean up and shutdown. ReconnectionTimeout time.Duration `env:"RYUK_RECONNECTION_TIMEOUT" envDefault:"10s"` + // RequestTimeout is the timeout for any Docker requests. + RequestTimeout time.Duration `env:"RYUK_REQUEST_TIMEOUT" envDefault:"10s"` + + // RemoveRetries is the number of times to retry removing a resource. + RemoveRetries int `env:"RYUK_REMOVE_RETRIES" envDefault:"10"` + + // RetryOffset is the offset added to the start time of the prune pass that is + // used as the minimum resource creation time. Any resource created after this + // calculated time will trigger a retry to ensure in use resources are not removed. + RetryOffset time.Duration `env:"RYUK_RETRY_OFFSET" envDefault:"-1s"` + // ShutdownTimeout is the maximum amount of time the reaper will wait // for once signalled to shutdown before it terminates even if connections // are still established. @@ -34,7 +45,10 @@ func (c config) LogAttrs() []slog.Attr { return []slog.Attr{ slog.Duration("connection_timeout", c.ConnectionTimeout), slog.Duration("reconnection_timeout", c.ReconnectionTimeout), + slog.Duration("request_timeout", c.RequestTimeout), slog.Duration("shutdown_timeout", c.ShutdownTimeout), + slog.Int("remove_retries", c.RemoveRetries), + slog.Duration("retry_offset", c.RetryOffset), slog.Int("port", int(c.Port)), slog.Bool("verbose", c.Verbose), } diff --git a/config_test.go b/config_test.go index e0ef6b4..43f847f 100644 --- a/config_test.go +++ b/config_test.go @@ -34,6 +34,9 @@ func Test_loadConfig(t *testing.T) { ConnectionTimeout: time.Minute, ReconnectionTimeout: time.Second * 10, ShutdownTimeout: time.Minute * 10, + RemoveRetries: 10, + RequestTimeout: time.Second * 10, + RetryOffset: -time.Second, } cfg, err := loadConfig() @@ -47,6 +50,9 @@ func Test_loadConfig(t *testing.T) { t.Setenv("RYUK_RECONNECTION_TIMEOUT", "3s") t.Setenv("RYUK_SHUTDOWN_TIMEOUT", "7s") t.Setenv("RYUK_VERBOSE", "true") + t.Setenv("RYUK_REQUEST_TIMEOUT", "4s") + t.Setenv("RYUK_REMOVE_RETRIES", "5") + t.Setenv("RYUK_RETRY_OFFSET", "-6s") expected := config{ Port: 1234, @@ -54,6 +60,9 @@ func Test_loadConfig(t *testing.T) { ReconnectionTimeout: time.Second * 3, ShutdownTimeout: time.Second * 7, Verbose: true, + RemoveRetries: 5, + RequestTimeout: time.Second * 4, + RetryOffset: -time.Second * 6, } cfg, err := loadConfig() @@ -67,6 +76,9 @@ func Test_loadConfig(t *testing.T) { "RYUK_RECONNECTION_TIMEOUT", "RYUK_SHUTDOWN_TIMEOUT", "RYUK_VERBOSE", + "RYUK_REQUEST_TIMEOUT", + "RYUK_REMOVE_RETRIES", + "RYUK_RETRY_OFFSET", } { t.Run("invalid-"+name, func(t *testing.T) { t.Setenv(name, "invalid") diff --git a/consts.go b/consts.go new file mode 100644 index 0000000..625d8c8 --- /dev/null +++ b/consts.go @@ -0,0 +1,18 @@ +package main + +const ( + // labelBase is the base label for testcontainers. + labelBase = "org.testcontainers" + + // ryukLabel is the label used to identify reaper containers. + ryukLabel = labelBase + ".ryuk" + + // fieldError is the log field key for errors. + fieldError = "error" + + // fieldAddress is the log field a client or listening address. + fieldAddress = "address" + + // fieldClients is the log field used for client counts. + fieldClients = "clients" +) diff --git a/go.mod b/go.mod index 4fa4551..bc456de 100644 --- a/go.mod +++ b/go.mod @@ -6,20 +6,13 @@ require ( github.com/caarlos0/env/v11 v11.2.2 github.com/docker/docker v27.2.0+incompatible github.com/stretchr/testify v1.9.0 - github.com/testcontainers/testcontainers-go v0.33.0 - gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e ) require ( - dario.cat/mergo v1.0.0 // indirect - github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect + github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect github.com/containerd/log v0.1.0 // indirect - github.com/containerd/platforms v0.2.1 // indirect - github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect github.com/docker/go-connections v0.5.0 // indirect @@ -27,17 +20,13 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-ole/go-ole v1.2.6 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/klauspost/compress v1.17.4 // indirect - github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect - github.com/magiconair/properties v1.8.7 // indirect - github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 // indirect + github.com/klauspost/compress v1.17.10 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/patternmatcher v0.6.0 // indirect - github.com/moby/sys/sequential v0.5.0 // indirect - github.com/moby/sys/user v0.1.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.3.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -45,25 +34,21 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect - github.com/shirou/gopsutil/v3 v3.23.12 // indirect - github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/tklauser/go-sysconf v0.3.12 // indirect - github.com/tklauser/numcpus v0.6.1 // indirect - github.com/yusufpapurcu/wmi v1.2.3 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel v1.29.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 // indirect - go.opentelemetry.io/otel/metric v1.29.0 // indirect - go.opentelemetry.io/otel/sdk v1.19.0 // indirect - go.opentelemetry.io/otel/trace v1.29.0 // indirect - golang.org/x/crypto v0.24.0 // indirect - golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect - google.golang.org/protobuf v1.33.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect + go.opentelemetry.io/otel v1.28.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 // indirect + go.opentelemetry.io/otel/metric v1.28.0 // indirect + go.opentelemetry.io/otel/sdk v1.28.0 // indirect + go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sys v0.22.0 // indirect + golang.org/x/time v0.5.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f // indirect + google.golang.org/grpc v1.65.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + gotest.tools/v3 v3.5.1 // indirect ) diff --git a/go.sum b/go.sum index 5cd15f0..1089cad 100644 --- a/go.sum +++ b/go.sum @@ -1,23 +1,16 @@ -dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= -dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3Kg= github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc= -github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= -github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= -github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927/go.mod h1:h/aW8ynjgkuj+NQRlZcDbAbM1ORAbXjXX77sX7T289U= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= -github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= -github.com/containerd/platforms v0.2.1/go.mod h1:XHCb+2/hzowdiut9rkudds9bE5yJ7npe7dG/wG+uFPw= -github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= -github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -38,40 +31,33 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= -github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= +github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= -github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= -github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= -github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 h1:JAEbJn3j/FrhdWA9jW8B5ajsLIjeuEHLi8xE4fk997o= -github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= -github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= -github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= -github.com/moby/sys/user v0.1.0 h1:WmZ93f5Ux6het5iituh9x2zAG7NFY9Aqi49jjE1PaQg= -github.com/moby/sys/user v0.1.0/go.mod h1:fKJhFOnsCN6xZ5gSfbM6zaHGgDJMrqt9/reuj4T7MmU= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= +github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= @@ -82,93 +68,67 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= -github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= -github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= -github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= -github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11DgpE4= -github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= -github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= -github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= -github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= -github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/testcontainers/testcontainers-go v0.33.0 h1:zJS9PfXYT5O0ZFXM2xxXfk4J5UMw/kRiISng037Gxdw= -github.com/testcontainers/testcontainers-go v0.33.0/go.mod h1:W80YpTa8D5C3Yy16icheD01UTDu+LmXIA2Keo+jWtT8= -github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= -github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= -github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= -github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= -github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= -go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= -go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0 h1:IeMeyr1aBvBiPVYihXIaeIZba6b8E1bYp7lbdxK8CQg= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.19.0/go.mod h1:oVdCUtjq9MK9BlS7TtucsQwUcXcymNiEDjgDD2jMtZU= -go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= -go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= -go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= -go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= -go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= -go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= -go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= -go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 h1:4K4tsIXefpVJtvA/8srF4V4y0akAoPHkIslgAkjixJA= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0/go.mod h1:jjdQuTGVsXV4vSs+CJ2qYDeDPf9yIJV23qlIzBm73Vg= +go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= +go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 h1:3Q/xZUyC1BBkualc9ROb4G8qkH90LXEIICcs5zv1OYY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0/go.mod h1:s75jGIWA9OfCMzF0xr+ZgfrB5FEbbV7UuYo32ahUiFI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0 h1:j9+03ymgYhPKmeXGk5Zu+cIZOlVzd9Zv7QIiyItjFBU= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.28.0/go.mod h1:Y5+XiUG4Emn1hTfciPzGPJaSI+RpDts6BnCIir0SLqk= +go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= +go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= +go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= +go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= -golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= +golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= -golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= @@ -177,19 +137,17 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 h1:RFiFrvy37/mpSpdySBDrUdipW/dHwsRwh3J3+A9VgT4= -google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.mod h1:Z5Iiy3jtmioajWHDGFk7CeugTyHtPvMHA4UTmUkyalE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:NnYq6UN9ReLM9/Y01KWNOWyI5xQ9kbIms5GGJVwS/Yc= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= -google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA= -google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 h1:0+ozOGcrp+Y8Aq8TLNN2Aliibms5LEzsq99ZZmAGYm0= +google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094/go.mod h1:fJ/e3If/Q67Mj99hin0hMhiNyCRmt6BQ2aWIJshUSJw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f h1:RARaIm8pxYuxyNPbBQf5igT7XdOyCNtat1qAT2ZxjU4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240725223205-93522f1f2a9f/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= -gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e h1:bJHzu9Qwc9wQRWJ/WVkJGAfs+riucl/tKAFNxf9pzqk= -gopkg.in/matryer/try.v1 v1.0.0-20150601225556-312d2599e12e/go.mod h1:tve0rTLdGlwnXF7iBO9rbAEyeXvuuPx0n4DvXS/Nw7o= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/interfaces.go b/interfaces.go new file mode 100644 index 0000000..44048d6 --- /dev/null +++ b/interfaces.go @@ -0,0 +1,25 @@ +package main + +import ( + "context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" +) + +// dockerClient is an interface that represents the reapers required Docker methods. +type dockerClient interface { + ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) + ContainerRemove(ctx context.Context, containerID string, options container.RemoveOptions) error + ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) + ImageRemove(ctx context.Context, imageID string, options image.RemoveOptions) ([]image.DeleteResponse, error) + NetworkList(ctx context.Context, options network.ListOptions) ([]network.Summary, error) + NetworkRemove(ctx context.Context, networkID string) error + VolumeList(ctx context.Context, options volume.ListOptions) (volume.ListResponse, error) + VolumeRemove(ctx context.Context, volumeID string, force bool) error + Ping(ctx context.Context) (types.Ping, error) + NegotiateAPIVersion(ctx context.Context) +} diff --git a/main.go b/main.go index bcb433f..6a8f591 100644 --- a/main.go +++ b/main.go @@ -1,290 +1,35 @@ +// Runs a container reaper that listens for connections and prunes resources based on the filters received. package main import ( - "bufio" "context" - "errors" "fmt" - "io" - "log" - "net" - "net/url" + "log/slog" + "os" "os/signal" - "strings" - "sync" "syscall" - "time" - - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/client" - "gopkg.in/matryer/try.v1" -) - -const ( - ryukLabel string = "org.testcontainers.ryuk" -) - -var ( - port uint16 - connectionTimeout time.Duration - reconnectionTimeout time.Duration - verbose bool ) -func main() { - cfg, err := loadConfig() - if err != nil { - panic(err) - } - - port = cfg.Port - connectionTimeout = cfg.ConnectionTimeout - reconnectionTimeout = cfg.ReconnectionTimeout - verbose = cfg.Verbose - - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - panic(err) - } - - cli.NegotiateAPIVersion(context.Background()) - - pingCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - log.Println("Pinging Docker...") - _, err = cli.Ping(pingCtx) - if err != nil { - panic(err) - } - - log.Println("Docker daemon is available!") - - deathNote := sync.Map{} - - // Buffered so we don't block the main process. - connectionAccepted := make(chan net.Addr, 10) - connectionLost := make(chan net.Addr, 10) - - go processRequests(&deathNote, connectionAccepted, connectionLost) - +// run creates and runs a reaper which is cancelled when a signal is received. +func run() error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - waitForPruneCondition(ctx, connectionAccepted, connectionLost) - - dc, dn, dv, di := prune(cli, &deathNote) - log.Printf("Removed %d container(s), %d network(s), %d volume(s) %d image(s)", dc, dn, dv, di) -} - -func processRequests(deathNote *sync.Map, connectionAccepted chan<- net.Addr, connectionLost chan<- net.Addr) { - log.Printf("Starting on port %d...", port) - - ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + r, err := newReaper(ctx) if err != nil { - panic(err) + return fmt.Errorf("new reaper: %w", err) } - log.Println("Started!") - for { - conn, err := ln.Accept() - if err != nil { - panic(err) - } - - connectionAccepted <- conn.RemoteAddr() - - go func(conn net.Conn) { - defer conn.Close() - defer func() { connectionLost <- conn.RemoteAddr() }() - - reader := bufio.NewReader(conn) - for { - message, err := reader.ReadString('\n') - - message = strings.TrimSpace(message) - - if len(message) > 0 { - query, err := url.ParseQuery(message) - - if err != nil { - log.Println(err) - continue - } - - args := filters.NewArgs() - for filterType, values := range query { - for _, value := range values { - args.Add(filterType, value) - } - } - paramBytes, err := args.MarshalJSON() - - if err != nil { - log.Println(err) - continue - } - param := string(paramBytes) - - log.Printf("Adding %s", param) - - deathNote.Store(param, true) - - _, _ = conn.Write([]byte("ACK\n")) - } - - if err != nil { - if !errors.Is(err, io.EOF) { - log.Println(err) - } - return - } - } - }(conn) + if err = r.run(ctx); err != nil { + return fmt.Errorf("run: %w", err) } -} -func waitForPruneCondition(ctx context.Context, connectionAccepted <-chan net.Addr, connectionLost <-chan net.Addr) { - connectionCount := 0 - timer := time.NewTimer(connectionTimeout) - for { - select { - case addr := <-connectionAccepted: - log.Printf("New client connected: %s", addr) - connectionCount++ - if connectionCount == 1 { - if !timer.Stop() { - <-timer.C - } - } - case addr := <-connectionLost: - log.Printf("Client disconnected: %s", addr.String()) - connectionCount-- - if connectionCount == 0 { - timer.Reset(reconnectionTimeout) - } - case <-ctx.Done(): - log.Println("Signal received") - return - case <-timer.C: - log.Println("Timeout waiting for connection") - return - } - } + return nil } -func prune(cli *client.Client, deathNote *sync.Map) (deletedContainers int, deletedNetworks int, deletedVolumes int, deletedImages int) { - deletedContainersMap := make(map[string]bool) - deletedNetworksMap := make(map[string]bool) - deletedVolumesMap := make(map[string]bool) - deletedImagesMap := make(map[string]bool) - - deathNote.Range(func(note, _ interface{}) bool { - param := fmt.Sprint(note) - if verbose { - log.Printf("Deleting %s\n", param) - } - - args, err := filters.FromJSON(param) - if err != nil { - log.Println(err) - return true - } - - containerListOpts := container.ListOptions{All: true, Filters: args} - if verbose { - log.Printf("Listing containers with filter: %#v\n", containerListOpts) - } - - if containers, err := cli.ContainerList(context.Background(), containerListOpts); err != nil { - log.Println(err) - } else { - containerRemoveOpts := container.RemoveOptions{RemoveVolumes: true, Force: true} - - for _, container := range containers { - value, isReaper := container.Labels[ryukLabel] - if isReaper && value == "true" { - continue - } - - if verbose { - log.Printf("Deleting containers with filter: %#v\n", containerRemoveOpts) - } - _ = cli.ContainerRemove(context.Background(), container.ID, containerRemoveOpts) - deletedContainersMap[container.ID] = true - } - } - - _ = try.Do(func(attempt int) (bool, error) { - if verbose { - log.Printf("Deleting networks with filter: %#v. (Attempt %d/%d)\n", args, attempt, 10) - } - - networksPruneReport, err := cli.NetworksPrune(context.Background(), args) - for _, networkID := range networksPruneReport.NetworksDeleted { - deletedNetworksMap[networkID] = true - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Network pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - _ = try.Do(func(attempt int) (bool, error) { - argsClone := args.Clone() - - // The API version >= v1.42 prunes only anonymous volumes: https://github.com/moby/moby/releases/tag/v23.0.0. - if serverVersion, err := cli.ServerVersion(context.Background()); err == nil && serverVersion.APIVersion >= "1.42" { - argsClone.Add("all", "true") - } - - if verbose { - log.Printf("Deleting volumes with filter: %#v. (Attempt %d/%d)\n", argsClone, attempt, 10) - } - - volumesPruneReport, err := cli.VolumesPrune(context.Background(), argsClone) - for _, volumeName := range volumesPruneReport.VolumesDeleted { - deletedVolumesMap[volumeName] = true - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Volumes pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - _ = try.Do(func(attempt int) (bool, error) { - argsClone := args.Clone() - argsClone.Add("dangling", "false") - - if verbose { - log.Printf("Deleting images with filter: %#v. (Attempt %d/%d)\n", argsClone, attempt, 10) - } - - imagesPruneReport, err := cli.ImagesPrune(context.Background(), argsClone) - for _, image := range imagesPruneReport.ImagesDeleted { - if image.Untagged != "" { - deletedImagesMap[image.Untagged] = true - } - } - shouldRetry := attempt < 10 - if err != nil && shouldRetry { - log.Printf("Images pruning has failed, retrying(%d/%d). The error was: %v", attempt, 10, err) - time.Sleep(1 * time.Second) - } - return shouldRetry, err - }) - - return true - }) - - deletedContainers = len(deletedContainersMap) - deletedNetworks = len(deletedNetworksMap) - deletedVolumes = len(deletedVolumesMap) - deletedImages = len(deletedImagesMap) - return +func main() { + if err := run(); err != nil { + slog.Error("run", fieldError, err) + os.Exit(1) + } } diff --git a/main_test.go b/main_test.go deleted file mode 100644 index 8de2ca2..0000000 --- a/main_test.go +++ /dev/null @@ -1,314 +0,0 @@ -package main - -import ( - "archive/tar" - "bytes" - "context" - "fmt" - "io" - "log" - "net" - "os" - "path/filepath" - "sync" - "testing" - "time" - - "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/volume" - "github.com/docker/docker/client" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" - "github.com/testcontainers/testcontainers-go/network" -) - -var addr = &net.TCPAddr{ - IP: net.IPv4zero, - Port: 5555, - Zone: "", -} - -var testConnectionTimeout time.Duration = 5 * time.Second - -func init() { - reconnectionTimeout = 1 * time.Second -} - -func TestReconnectionTimeout(t *testing.T) { - // reset connectionTimeout - connectionTimeout = testConnectionTimeout - - acc := make(chan net.Addr) - lost := make(chan net.Addr) - - done := make(chan struct{}) - - go func() { - waitForPruneCondition(context.Background(), acc, lost) - done <- struct{}{} - }() - - acc <- addr - lost <- addr - - select { - case <-done: - return - case <-time.After(2 * time.Second): - t.Fail() - } -} - -func TestInitialTimeout(t *testing.T) { - // reset connectionTimeout - connectionTimeout = testConnectionTimeout - - origWriter := log.Default().Writer() - defer func() { - log.SetOutput(origWriter) - }() - var buf bytes.Buffer - log.SetOutput(&buf) - - acc := make(chan net.Addr) - lost := make(chan net.Addr) - done := make(chan string) - - go func() { - waitForPruneCondition(context.Background(), acc, lost) - done <- buf.String() - }() - - select { - case p := <-done: - require.Contains(t, p, "Timeout waiting for connection") - case <-time.After(7 * time.Second): - t.Fatal("Timeout waiting prune condition") - } -} - -func TestPrune(t *testing.T) { - tcCli, err := testcontainers.NewDockerClientWithOpts(context.Background(), client.FromEnv) - tcCli.NegotiateAPIVersion(context.Background()) - - cli := tcCli.Client - - if err == nil { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - _, err = cli.Ping(ctx) - } - - if err != nil { - t.Fatal(err) - } - cli.NegotiateAPIVersion(context.Background()) - - maxLength := 25 - - t.Run("Empty death note", func(t *testing.T) { - deathNote := &sync.Map{} - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Malformed death note", func(t *testing.T) { - deathNote := &sync.Map{} - deathNote.Store("param", true) - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Malformed JSON death note", func(t *testing.T) { - deathNote := &sync.Map{} - deathNote.Store(`{"label": "color"}`, true) - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing containers", func(t *testing.T) { - const label = "removable-container" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "nginx:alpine", - Labels: map[string]string{ - label: "true", - }, - SkipReaper: true, - }, - Started: true, - }) - require.Nil(t, err) - require.NotNil(t, c) - t.Cleanup(func() { - require.Error(t, c.Terminate(ctx), "container should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, maxLength, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note skips reaper container itself", func(t *testing.T) { - const label = "removable-container" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "nginx:alpine", - Labels: map[string]string{ - label: "true", - "org.testcontainers.ryuk": "true", - }, - SkipReaper: true, - }, - Started: true, - }) - require.Nil(t, err) - require.NotNil(t, c) - - dc, _, _, _ := prune(cli, deathNote) - assert.Equal(t, 0, dc) - - err = c.Terminate(ctx) - require.Nil(t, err) - }) - - t.Run("Death note removing networks", func(t *testing.T) { - const label = "removable-network" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - nw, err := network.New(ctx, network.WithLabels(map[string]string{label: "true"})) - require.Nil(t, err) - require.NotNil(t, nw) - t.Cleanup(func() { - require.Error(t, nw.Remove(ctx), "network should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, maxLength, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing volumes", func(t *testing.T) { - const label = "removable-volume" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - opts := volume.CreateOptions{ - Name: fmt.Sprintf("volume-%d", i), - Labels: map[string]string{ - label: "true", - }, - } - - vol, err := cli.VolumeCreate(ctx, opts) - require.Nil(t, err) - require.NotNil(t, vol) - t.Cleanup(func() { - // force remove the volume, which does not fail if the volume was already removed - require.NoError(t, cli.VolumeRemove(ctx, vol.Name, true), "volume should have been removed") - }) - } - - dc, dn, dv, di := prune(cli, deathNote) - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, maxLength, dv) - assert.Equal(t, 0, di) - }) - - t.Run("Death note removing images", func(t *testing.T) { - const label = "removable-image" - deathNote := &sync.Map{} - deathNote.Store(`{"label": {"`+label+`=true": true}}`, true) - - ctx := context.Background() - for i := 0; i < maxLength; i++ { - buf := new(bytes.Buffer) - tw := tar.NewWriter(buf) - defer tw.Close() - - dockerFile := "Dockerfile" - dockerFileReader, err := os.Open(filepath.Join("testresources", dockerFile)) - require.Nil(t, err) - - readDockerFile, err := io.ReadAll(dockerFileReader) - require.Nil(t, err) - - tarHeader := &tar.Header{ - Name: dockerFile, - Size: int64(len(readDockerFile)), - } - err = tw.WriteHeader(tarHeader) - require.Nil(t, err) - - _, err = tw.Write(readDockerFile) - require.Nil(t, err) - dockerFileTarReader := bytes.NewReader(buf.Bytes()) - - opt := types.ImageBuildOptions{ - Remove: true, - ForceRemove: true, // removing containers produced by the build - Labels: map[string]string{ - label: "true", - "index": fmt.Sprintf("%d", i), - }, - Context: dockerFileTarReader, - Dockerfile: dockerFile, - Tags: []string{fmt.Sprintf("moby-ryuk:test-%d", i)}, // adding a tag so that image is not marked as 'dangling' - } - - response, err := cli.ImageBuild(ctx, dockerFileTarReader, opt) - require.Nil(t, err) - require.NotNil(t, response) - - // need to read the response from Docker before continuing the execution - buf = new(bytes.Buffer) - _, err = buf.ReadFrom(response.Body) - require.Nil(t, err) - - err = response.Body.Close() - require.Nil(t, err) - } - - dc, dn, dv, di := prune(cli, deathNote) - - assert.Equal(t, 0, dc) - assert.Equal(t, 0, dn) - assert.Equal(t, 0, dv) - assert.Equal(t, maxLength, di) - }) -} diff --git a/mock_test.go b/mock_test.go new file mode 100644 index 0000000..3d5a666 --- /dev/null +++ b/mock_test.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/stretchr/testify/mock" +) + +var _ dockerClient = (*mockClient)(nil) + +type mockClient struct { + mock.Mock +} + +func (c *mockClient) ContainerList(ctx context.Context, options container.ListOptions) ([]types.Container, error) { + args := c.Called(ctx, options) + return args.Get(0).([]types.Container), args.Error(1) +} + +func (c *mockClient) ContainerRemove(ctx context.Context, containerID string, options container.RemoveOptions) error { + args := c.Called(ctx, containerID, options) + return args.Error(0) +} + +func (c *mockClient) ImageList(ctx context.Context, options image.ListOptions) ([]image.Summary, error) { + args := c.Called(ctx, options) + return args.Get(0).([]image.Summary), args.Error(1) +} + +func (c *mockClient) ImageRemove(ctx context.Context, imageID string, options image.RemoveOptions) ([]image.DeleteResponse, error) { + args := c.Called(ctx, imageID, options) + return args.Get(0).([]image.DeleteResponse), args.Error(1) +} + +func (c *mockClient) NetworkList(ctx context.Context, options network.ListOptions) ([]network.Summary, error) { + args := c.Called(ctx, options) + return args.Get(0).([]network.Summary), args.Error(1) +} + +func (c *mockClient) NetworkRemove(ctx context.Context, networkID string) error { + args := c.Called(ctx, networkID) + return args.Error(0) +} + +func (c *mockClient) VolumeList(ctx context.Context, options volume.ListOptions) (volume.ListResponse, error) { + args := c.Called(ctx, options) + return args.Get(0).(volume.ListResponse), args.Error(1) +} + +func (c *mockClient) VolumeRemove(ctx context.Context, volumeID string, force bool) error { + args := c.Called(ctx, volumeID, force) + return args.Error(0) +} + +func (c *mockClient) Ping(ctx context.Context) (types.Ping, error) { + args := c.Called(ctx) + return args.Get(0).(types.Ping), args.Error(1) +} + +func (c *mockClient) NegotiateAPIVersion(ctx context.Context) { + c.Called(ctx) +} diff --git a/reaper.go b/reaper.go new file mode 100644 index 0000000..712e5a9 --- /dev/null +++ b/reaper.go @@ -0,0 +1,673 @@ +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net" + "net/url" + "os" + "sync" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" +) + +//nolint:gochecknoglobals // Reusable options are fine as globals. +var ( + // errChangesDetected is returned when changes are detected. + errChangesDetected = errors.New("changes detected") + + // containerRemoveOptions are the options we use to remove a container. + containerRemoveOptions = container.RemoveOptions{RemoveVolumes: true, Force: true} + + // imageRemoveOptions are the options we use to remove an image. + imageRemoveOptions = image.RemoveOptions{PruneChildren: true} + + // volumeRemoveForce is the force option we use to remove a volume. + volumeRemoveForce = true + + // ackResponse is the response we send to the client to acknowledge a filter. + ackResponse = []byte("ACK\n") +) + +// reaper listens for connections and prunes resources based on the filters received +// once a prune condition is met. +type reaper struct { + client dockerClient + listener net.Listener + cfg *config + connected chan string + disconnected chan string + shutdown chan struct{} + filters map[string]filters.Args + logger *slog.Logger + mtx sync.Mutex +} + +// reaperOption is a function that sets an option on a reaper. +type reaperOption func(*reaper) error + +// withConfig returns a reaperOption that sets the configuration. +// Default: loaded from the environment. +func withConfig(cfg config) reaperOption { + return func(r *reaper) error { + r.cfg = &cfg + return nil + } +} + +// withLogger returns a reaperOption that sets the logger. +// If specified the log level will not be changed by the +// configuration, so should be set to the desired level. +// Default: A text handler to stdout with the log level +// set by the configuration. +func withLogger(logger *slog.Logger) reaperOption { + return func(r *reaper) error { + r.logger = logger + return nil + } +} + +// withClient returns a reaperOption that sets the Docker client. +// Default: A docker client created with options from the environment. +func withClient(client dockerClient) reaperOption { + return func(r *reaper) error { + r.client = client + return nil + } +} + +// newReaper creates a new reaper with the specified options. +// Default options are used if not specified, see the individual +// options for details. +func newReaper(ctx context.Context, options ...reaperOption) (*reaper, error) { + logLevel := &slog.LevelVar{} + r := &reaper{ + filters: make(map[string]filters.Args), + connected: make(chan string), // Must be unbuffered to ensure correct behaviour. + disconnected: make(chan string), + shutdown: make(chan struct{}), + logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: logLevel, + })), + } + + for _, option := range options { + if err := option(r); err != nil { + return nil, fmt.Errorf("option: %w", err) + } + } + + var err error + if r.client == nil { + // Default client configured from the environment. + if r.client, err = client.NewClientWithOpts(client.FromEnv); err != nil { + return nil, fmt.Errorf("new client: %w", err) + } + } + + r.client.NegotiateAPIVersion(ctx) + + if r.cfg == nil { + // Default configuration loaded from the environment. + if r.cfg, err = loadConfig(); err != nil { + return nil, fmt.Errorf("load config: %w", err) + } + } + + if r.cfg.Verbose { + logLevel.Set(slog.LevelDebug) + } + + pingCtx, cancel := context.WithTimeout(ctx, r.cfg.RequestTimeout) + defer cancel() + + if _, err = r.client.Ping(pingCtx); err != nil { + return nil, fmt.Errorf("ping: %w", err) + } + + r.logger.LogAttrs(ctx, slog.LevelInfo, "starting", r.cfg.LogAttrs()...) + if r.listener, err = net.Listen("tcp", fmt.Sprintf(":%d", r.cfg.Port)); err != nil { + return nil, fmt.Errorf("listen: %w", err) + } + + r.logger.Info("listening", fieldAddress, r.listener.Addr().String()) + + return r, nil +} + +// run starts the reaper which prunes resources when: +// - Signalled by the context +// - No connections are received within the connection timeout +// - A connection is received and no further connections are received within the reconnection timeout +func (r *reaper) run(ctx context.Context) error { + defer r.logger.Info("done") + + // Process incoming connections. + go r.processClients() + + // Wait for all tasks to complete. + if err := r.pruner(ctx); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + + return err + } + + return nil +} + +// pruner waits for a prune condition to be triggered then runs a prune. +func (r *reaper) pruner(ctx context.Context) error { + var errs []error + resources, err := r.pruneWait(ctx) + if err != nil { + errs = append(errs, fmt.Errorf("prune wait: %w", err)) + } + + if err = r.prune(resources); err != nil { //nolint:contextcheck // Prune needs its own context to ensure clean up completes. + errs = append(errs, fmt.Errorf("prune: %w", err)) + } + + return errors.Join(errs...) +} + +// processClients listens for incoming connections and processes them. +func (r *reaper) processClients() { + r.logger.Info("client processing started") + defer r.logger.Info("client processing stopped") + + for { + conn, err := r.listener.Accept() + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, net.ErrClosed) { + return + } + + r.logger.Error("accept", fieldError, err) + continue + } + + // Block waiting for the connection to be registered + // so that we prevent the race on connection count. + addr := conn.RemoteAddr().String() + select { + case r.connected <- addr: + case <-r.shutdown: + // We received a new connection after shutdown started. + // Closing without returning the ACK should trigger the caller + // to retry and get a new reaper. + r.logger.Warn("shutdown, aborting client", fieldAddress, addr) + conn.Close() + return + } + + go r.handle(conn) + } +} + +// handle processes a connection, reading session details from +// the client and adding them to our filter. +func (r *reaper) handle(conn net.Conn) { + addr := conn.RemoteAddr().String() + defer func() { + conn.Close() + r.disconnected <- addr + }() + + logger := r.logger.With(fieldAddress, addr) + + // Read filters from the client and add them to our list. + scanner := bufio.NewScanner(conn) + for scanner.Scan() { + msg := scanner.Text() + + switch msg { + case "": + logger.Warn("empty filter received") + continue + default: + if err := r.addFilter(msg); err != nil { + logger.Error("add filter", fieldError, err) + if _, err := conn.Write(ackResponse); err != nil { + logger.Error("ack write", fieldError, err) + } + continue + } + + if _, err := conn.Write(ackResponse); err != nil { + logger.Error("ack write", fieldError, err) + } + } + } + + if err := scanner.Err(); err != nil { + logger.Error("scan", fieldError, err) + } +} + +// resources represents the resources to prune. +type resources struct { + containers []string + networks []string + volumes []string + images []string +} + +// shutdownListener ensures that the listener is shutdown and no new clients +// are accepted. +func (r *reaper) shutdownListener() { + select { + case <-r.shutdown: + return // Already shutdown. + default: + close(r.shutdown) + r.listener.Close() + } +} + +// pruneWait waits for a prune condition to be met and returns the resources to prune. +// It will retry if changes are detected. +func (r *reaper) pruneWait(ctx context.Context) (*resources, error) { + defer r.shutdownListener() + + clients := 0 + pruneCheck := time.NewTicker(r.cfg.ConnectionTimeout) + done := ctx.Done() + for { + select { + case addr := <-r.connected: + clients++ + r.logger.Info("client connected", fieldAddress, addr, fieldClients, clients) + if clients == 1 { + pruneCheck.Stop() + } + case addr := <-r.disconnected: + clients-- + r.logger.Info("client disconnected", fieldAddress, addr, fieldClients, clients) + if clients == 0 && done != nil { + pruneCheck.Reset(r.cfg.ReconnectionTimeout) + } + case <-done: + r.logger.Info("signal received", fieldClients, clients, "shutdown_timeout", r.cfg.ShutdownTimeout) + // Force shutdown by closing the listener, scheduling + // a pruneCheck after a timeout and setting done + // to nil so we don't enter this case again. + r.shutdownListener() + timeout := r.cfg.ShutdownTimeout + if clients == 0 { + // No clients connected, shutdown immediately. + timeout = time.Nanosecond + } + + pruneCheck.Reset(timeout) + done = nil + case now := <-pruneCheck.C: + r.logger.Info("prune check", fieldClients, clients) + + if clients > 0 { + r.logger.Warn("shutdown timeout", fieldClients, clients) + } + + resources, err := r.resources(now.Add(r.cfg.RetryOffset)) //nolint:contextcheck // Needs its own context to ensure clean up completes. + if err != nil { + if errors.Is(err, errChangesDetected) { + r.logger.Warn("change detected, waiting again", fieldError, err) + continue + } + + return resources, fmt.Errorf("resources: %w", err) + } + + return resources, nil + } + } +} + +// resources returns the resources that match the collected filters. +func (r *reaper) resources(since time.Time) (*resources, error) { + var ret resources + var err error + var errs []error + // We combine errors so we can do best effort removal. + for _, args := range r.filterArgs() { + if ret.containers, err = r.affectedContainers(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected containers", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected containers: %w", err)) + } + + if ret.networks, err = r.affectedNetworks(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected networks", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected networks: %w", err)) + } + + if ret.volumes, err = r.affectedVolumes(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected volumes", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected volumes: %w", err)) + } + + if ret.images, err = r.affectedImages(since, args); err != nil { + if !errors.Is(err, errChangesDetected) { + r.logger.Error("affected images", fieldError, err) + } + errs = append(errs, fmt.Errorf("affected images: %w", err)) + } + } + + return &ret, errors.Join(errs...) +} + +// affectedContainers returns a slice of container IDs that match the filters. +// If a matching container was created after since, an error is returned. +func (r *reaper) affectedContainers(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := container.ListOptions{All: true, Filters: args} + r.logger.Debug("listing containers", "filter", options) + containers, err := r.client.ContainerList(ctx, options) + if err != nil { + return nil, fmt.Errorf("container list: %w", err) + } + + containerIDs := make([]string, 0, len(containers)) + for _, container := range containers { + if container.Labels[ryukLabel] == "true" { + // Ignore reaper containers. + r.logger.Debug("skipping reaper container", "id", container.ID) + continue + } + + created := time.Unix(container.Created, 0) + changed := created.After(since) + + r.logger.Debug("found container", + "id", container.ID, + "image", container.Image, + "names", container.Names, + "ports", container.Ports, + "state", container.State, + "labels", container.Labels, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a container which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("container %s: %w", container.ID, errChangesDetected) + } + + containerIDs = append(containerIDs, container.ID) + } + + return containerIDs, nil +} + +// affectedNetworks returns a list of network IDs that match the filters. +// If a matching network was created after since, an error is returned. +func (r *reaper) affectedNetworks(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := network.ListOptions{Filters: args} + r.logger.Debug("listing networks", "options", options) + report, err := r.client.NetworkList(ctx, options) + if err != nil { + return nil, fmt.Errorf("network list: %w", err) + } + + networks := make([]string, 0, len(report)) + for _, network := range report { + changed := network.Created.After(since) + r.logger.Debug("found network", + "id", network.ID, + "created", network.Created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a network which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("network %s: %w", network.ID, errChangesDetected) + } + + networks = append(networks, network.ID) + } + + return networks, nil +} + +// affectedVolumes returns a list of volume names that match the filters. +// If a matching volume was created after since, an error is returned. +func (r *reaper) affectedVolumes(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := volume.ListOptions{Filters: args} + r.logger.Debug("listing volumes", "filter", options) + report, err := r.client.VolumeList(ctx, options) + if err != nil { + return nil, fmt.Errorf("volume list: %w", err) + } + + volumes := make([]string, 0, len(report.Volumes)) + for _, volume := range report.Volumes { + created, perr := time.Parse(time.RFC3339, volume.CreatedAt) + if perr != nil { + // Best effort, log and continue. + r.logger.Error("parse volume created", fieldError, perr, "volume", volume.Name) + continue + } + + changed := created.After(since) + r.logger.Debug("found volume", + "name", volume.Name, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove a volume which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("volume %s: %w", volume.Name, errChangesDetected) + } + + volumes = append(volumes, volume.Name) + } + + return volumes, nil +} + +// affectedImages returns a list of image IDs that match the filters. +// If a matching volume was created after since, an error is returned. +func (r *reaper) affectedImages(since time.Time, args filters.Args) ([]string, error) { + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + options := image.ListOptions{Filters: args} + r.logger.Debug("listing images", "filter", options) + report, err := r.client.ImageList(ctx, options) + if err != nil { + return nil, fmt.Errorf("image list: %w", err) + } + + images := make([]string, 0, len(report)) + for _, image := range report { + created := time.Unix(image.Created, 0) + changed := created.After(since) + r.logger.Debug("found image", + "id", image.ID, + "created", created, + "changed", changed, + "since", since, + ) + + if changed { + // Its not safe to remove an image which was created after + // the prune was initiated, as this may lead to unexpected behaviour. + return nil, fmt.Errorf("image %s: %w", image.ID, errChangesDetected) + } + + images = append(images, image.ID) + } + + return images, nil +} + +// addFilter adds a filter to prune. +// Safe to call concurrently. +func (r *reaper) addFilter(msg string) error { + query, err := url.ParseQuery(msg) + if err != nil { + return fmt.Errorf("parse query: %w", err) + } + + args := filters.NewArgs() + for filterType, values := range query { + r.logger.Info("adding filter", "type", filterType, "values", values) + for _, value := range values { + args.Add(filterType, value) + } + } + + // We can't use msg as it could be in any order. + data, err := args.MarshalJSON() + if err != nil { + return fmt.Errorf("marshal json: %w", err) + } + + key := string(data) + + r.mtx.Lock() + defer r.mtx.Unlock() + + if _, ok := r.filters[key]; ok { + r.logger.Debug("filter already exists", "key", key) + return nil + } + + r.logger.Debug("adding filter", "args", args, "key", key) + r.filters[key] = args + + return nil +} + +// filterArgs returns a slice of filter.Args to check against. +// Safe to call concurrently. +func (r *reaper) filterArgs() []filters.Args { + r.mtx.Lock() + defer r.mtx.Unlock() + + filters := make([]filters.Args, 0, len(r.filters)) + for _, args := range r.filters { + filters = append(filters, args) + } + + return filters +} + +// prune removes the specified resources. +func (r *reaper) prune(resources *resources) error { + var containers, networks, volumes, images int + var errs []error + + // Containers must be removed first. + errs = append(errs, r.remove("container", resources.containers, &containers, func(ctx context.Context, id string) error { + return r.client.ContainerRemove(ctx, id, containerRemoveOptions) + })) + + // Networks. + errs = append(errs, r.remove("network", resources.networks, &networks, func(ctx context.Context, id string) error { + return r.client.NetworkRemove(ctx, id) + })) + + // Volumes. + errs = append(errs, r.remove("volume", resources.volumes, &volumes, func(ctx context.Context, id string) error { + return r.client.VolumeRemove(ctx, id, volumeRemoveForce) + })) + + // Images. + errs = append(errs, r.remove("image", resources.images, &images, func(ctx context.Context, id string) error { + _, err := r.client.ImageRemove(ctx, id, imageRemoveOptions) + return err //nolint:wrapcheck // Wrapped by action. + })) + + r.logger.Info("removed", "containers", containers, "networks", networks, "volumes", volumes, "images", images) + + return errors.Join(errs...) +} + +// remove calls fn for each resource in resources and retries if necessary. +// Count is incremented for each resource that is successfully removed. +func (r *reaper) remove(resourceType string, resources []string, count *int, fn func(ctx context.Context, id string) error) error { + logger := r.logger.With("resource", resourceType) + logger.Debug("removing", "count", len(resources)) + + if len(resources) == 0 { + return nil + } + + todo := make(map[string]struct{}, len(resources)) + for _, id := range resources { + todo[id] = struct{}{} + } + + for attempt := 1; attempt <= r.cfg.RemoveRetries; attempt++ { + var retry bool + for id := range todo { + itemLogger := logger.With("id", id, "attempt", attempt) + + ctx, cancel := context.WithTimeout(context.Background(), r.cfg.RequestTimeout) + defer cancel() + + itemLogger.Debug("remove") + if err := fn(ctx, id); err != nil { + if errdefs.IsNotFound(err) { + // Already removed. + itemLogger.Debug("not found") + continue + } + + itemLogger.Error("remove", fieldError, err) + retry = true + continue + } + + delete(todo, id) + *count++ + } + + if retry { + if attempt < r.cfg.RemoveRetries { + time.Sleep(time.Second) + } + continue + } + + // All items were removed. + return nil + } + + // Some items were not removed. + return fmt.Errorf("%s left %d items", resourceType, len(todo)) +} diff --git a/reaper_test.go b/reaper_test.go new file mode 100644 index 0000000..22f4ec6 --- /dev/null +++ b/reaper_test.go @@ -0,0 +1,753 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log/slog" + "net" + "strings" + "sync" + "syscall" + "testing" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/image" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" + "github.com/docker/docker/errdefs" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/jsonmessage" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +const ( + containerID1 = "container1" + containerID2 = "container2" + networkID1 = "network1" + networkID2 = "network2" + volumeName1 = "volume1" + volumeName2 = "volume2" + imageID1 = "image1" + imageID2 = "image2" + testImage = "alpine:latest" + imageBuildResult = "moby.image.id" +) + +var ( + // testConfig is a config used for testing. + testConfig = withConfig(config{ + Port: 0, + ConnectionTimeout: time.Millisecond * 500, + ReconnectionTimeout: time.Millisecond * 100, + RequestTimeout: time.Millisecond * 50, + ShutdownTimeout: time.Second * 2, + RemoveRetries: 1, + RetryOffset: -time.Second * 2, + Verbose: true, + }) + + // discardLogger is a logger that discards all logs. + discardLogger = withLogger(slog.New(slog.NewTextHandler(io.Discard, nil))) + + // testLabels is a set of test labels. + testLabels = map[string]string{ + labelBase: "true", + labelBase + ".sessionID": "test-session", + labelBase + ".version": "0.1.0", + } + + // mockContext is a matcher that matches any context. + mockContext = mock.MatchedBy(func(context.Context) bool { return true }) + + // errNotFound is a docker not found error. + errNotFound = errdefs.NotFound(errors.New("not found")) +) + +func Test_newReaper(t *testing.T) { + ctx := context.Background() + t.Run("basic", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, testConfig) + require.NoError(t, err) + require.NotNil(t, r) + }) + + t.Run("with-config", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, testConfig) + require.NoError(t, err) + require.NotNil(t, r) + }) + + t.Run("bad-config", func(t *testing.T) { + r, err := newReaper(ctx, discardLogger, withConfig(config{})) + require.Error(t, err) + require.Nil(t, r) + }) + + t.Run("with-client", func(t *testing.T) { + cli := &mockClient{} + cli.On("Ping", mockContext).Return(types.Ping{}, nil) + cli.On("NegotiateAPIVersion", mockContext).Return() + r, err := newReaper(ctx, discardLogger, testConfig, withClient(cli)) + require.NoError(t, err) + require.NotNil(t, r) + }) +} + +// testConnect connects to the given endpoint, sends filter labels, +// and expects an ACK. The connection is closed when the context is done. +func testConnect(ctx context.Context, t *testing.T, endpoint string) { + t.Helper() + + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", endpoint) + require.NoError(t, err) + + labelFilters := make([]string, 0, len(testLabels)) + for l, v := range testLabels { + labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) + } + + _, err = conn.Write([]byte(strings.Join(labelFilters, "&") + "\n")) + require.NoError(t, err) + + buf := make([]byte, 4) + n, err := conn.Read(buf) + require.NoError(t, err) + require.Equal(t, "ACK\n", string(buf[:n])) + + go func() { + defer conn.Close() + <-ctx.Done() + }() +} + +// runTest is a test case for the reaper run method. +type runTest struct { + createdAt1 time.Time + pingErr error + + containerListErr error + containerRemoveErr1 error + containerRemoveErr2 error + containerCreated2 time.Time + + networkListErr error + networkRemoveErr1 error + networkRemoveErr2 error + networkCreated2 time.Time + + volumeListErr error + volumeRemoveErr1 error + volumeRemoveErr2 error + volumeCreated2 time.Time + + imageListErr error + imageRemoveErr1 error + imageRemoveErr2 error + imageCreated2 time.Time +} + +// newRunTest returns a new runTest with created at times set in the past. +func newRunTest() *runTest { + now := time.Now().Add(-time.Minute) + return &runTest{ + createdAt1: now, + containerCreated2: now, + networkCreated2: now, + volumeCreated2: now, + imageCreated2: now, + } +} + +// newMockClient returns a new mock client for the given test case. +func newMockClient(tc *runTest) *mockClient { + cli := &mockClient{} + cli.On("Ping", mock.Anything).Return(types.Ping{}, tc.pingErr) + cli.On("NegotiateAPIVersion", mockContext).Return() + + // Mock the container list and remove calls. + cli.On("ContainerList", mockContext, mock.Anything).Return([]types.Container{ + { + ID: containerID1, + Created: tc.createdAt1.Unix(), + Image: "testcontainers/test1:latest", + Names: []string{"test1"}, + Ports: []types.Port{{ + PrivatePort: 1001, + PublicPort: 8081, + Type: "tcp", + }}, + State: "running", + Labels: testLabels, + }, + { + ID: containerID2, + Created: tc.containerCreated2.Unix(), + Image: "testcontainers/test2:latest", + Names: []string{"test2"}, + Ports: []types.Port{{ + PrivatePort: 1002, + PublicPort: 8082, + Type: "tcp", + }}, + State: "running", + Labels: testLabels, + }, + }, tc.containerListErr) + + cli.On("ContainerRemove", mockContext, containerID1, containerRemoveOptions). + Return(tc.containerRemoveErr1) + cli.On("ContainerRemove", mockContext, containerID2, containerRemoveOptions). + Return(tc.containerRemoveErr2) + + // Mock the network list and remove calls. + cli.On("NetworkList", mockContext, mock.Anything). + Return([]network.Summary{ + {ID: networkID1, Created: tc.createdAt1}, + {ID: networkID2, Created: tc.networkCreated2}, + }, tc.networkListErr) + cli.On("NetworkRemove", mockContext, networkID1). + Return(tc.networkRemoveErr1) + cli.On("NetworkRemove", mockContext, networkID2). + Return(tc.networkRemoveErr2) + + // Mock the volume list and remove calls. + cli.On("VolumeList", mockContext, mock.Anything). + Return(volume.ListResponse{ + Volumes: []*volume.Volume{ + {Name: volumeName1, CreatedAt: tc.createdAt1.Format(time.RFC3339)}, + {Name: volumeName2, CreatedAt: tc.volumeCreated2.Format(time.RFC3339)}, + }, + }, tc.volumeListErr) + cli.On("VolumeRemove", mockContext, volumeName1, volumeRemoveForce). + Return(tc.volumeRemoveErr1) + cli.On("VolumeRemove", mockContext, volumeName2, volumeRemoveForce). + Return(tc.volumeRemoveErr2) + + // Mock the image list and remove calls. + cli.On("ImageList", mockContext, mock.Anything).Return([]image.Summary{ + {ID: imageID1, Created: tc.createdAt1.Unix()}, + {ID: imageID2, Created: tc.imageCreated2.Unix()}, + }, tc.imageListErr) + cli.On("ImageRemove", mockContext, imageID1, imageRemoveOptions). + Return([]image.DeleteResponse{{Deleted: imageID1}}, tc.imageRemoveErr1) + cli.On("ImageRemove", mockContext, imageID2, imageRemoveOptions). + Return([]image.DeleteResponse{{Deleted: imageID2}}, tc.imageRemoveErr2) + + return cli +} + +// testReaperRun runs the reaper with the given test case and returns the log output. +func testReaperRun(t *testing.T, tc *runTest) (string, error) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var buf safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + client := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(client), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + go func() { + errCh <- r.run(ctx) + }() + + clientCtx, clientCancel := context.WithTimeout(ctx, time.Millisecond*500) + t.Cleanup(clientCancel) + + addr := r.listener.Addr().String() + testConnect(clientCtx, t, addr) + testConnect(clientCtx, t, addr) + + select { + case err = <-errCh: + case <-ctx.Done(): + t.Fatal("timeout", buf.String()) + } + + // Standard checks for basic functionality. + log := buf.String() + require.Contains(t, log, "listening address="+addr) + require.Contains(t, log, "client connected") + require.Contains(t, log, "adding filter") + + return log, err +} + +func Test_newReaper_Run(t *testing.T) { + t.Run("end-to-end", func(t *testing.T) { + tc := newRunTest() + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("container-created", func(t *testing.T) { + tc := newRunTest() + tc.containerCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected containers: container container2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("network-created", func(t *testing.T) { + tc := newRunTest() + tc.networkCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected networks: network network2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("volume-created", func(t *testing.T) { + tc := newRunTest() + tc.volumeCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected volumes: volume volume2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("image-created", func(t *testing.T) { + tc := newRunTest() + tc.imageCreated2 = time.Now().Add(time.Millisecond * 200) + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.Contains(t, log, `msg="change detected, waiting again" error="affected images: image image2: changes detected"`) + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=2") + }) + + t.Run("not-found", func(t *testing.T) { + tc := newRunTest() + tc.containerRemoveErr1 = errNotFound + tc.networkRemoveErr1 = errNotFound + tc.volumeRemoveErr1 = errNotFound + tc.imageRemoveErr1 = errNotFound + log, err := testReaperRun(t, tc) + require.NoError(t, err) + + require.NotContains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=1 networks=1 volumes=1 images=1") + }) + + t.Run("container-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.containerRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: container left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=1 networks=2 volumes=2 images=2") + }) + + t.Run("network-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.networkRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: network left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=1 volumes=2 images=2") + }) + + t.Run("volume-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.volumeRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: volume left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=1 images=2") + }) + + t.Run("image-remove-error", func(t *testing.T) { + tc := newRunTest() + tc.imageRemoveErr1 = errors.New("remove error") + log, err := testReaperRun(t, tc) + require.EqualError(t, err, "prune: image left 1 items") + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=1") + }) + + t.Run("container-list-error", func(t *testing.T) { + tc := newRunTest() + tc.containerListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "container list: "+tc.containerListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=0 networks=2 volumes=2 images=2") + }) + + t.Run("network-list-error", func(t *testing.T) { + tc := newRunTest() + tc.networkListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "network list: "+tc.networkListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=0 volumes=2 images=2") + }) + + t.Run("volume-list-error", func(t *testing.T) { + tc := newRunTest() + tc.volumeListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "volume list: "+tc.volumeListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=0 images=2") + }) + + t.Run("image-list-error", func(t *testing.T) { + tc := newRunTest() + tc.imageListErr = errors.New("list error") + log, err := testReaperRun(t, tc) + require.Error(t, err) + require.Contains(t, err.Error(), "image list: "+tc.imageListErr.Error()) + + require.Contains(t, log, "level=ERROR") + require.NotContains(t, log, "level=WARN") + require.Contains(t, log, "removed containers=2 networks=2 volumes=2 images=0") + }) +} + +// safeBuffer is a buffer safe for concurrent use. +type safeBuffer struct { + buf bytes.Buffer + mtx sync.Mutex +} + +// Write writes to the buffer. +func (sb *safeBuffer) Write(p []byte) (n int, err error) { + sb.mtx.Lock() + defer sb.mtx.Unlock() + + return sb.buf.Write(p) +} + +// String returns the buffer as a string. +func (sb *safeBuffer) String() string { + sb.mtx.Lock() + defer sb.mtx.Unlock() + + return sb.buf.String() +} + +func TestAbortedClient(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + cli := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(cli), testConfig) + require.NoError(t, err) + + // Start processing clients. + go r.processClients() + + // Fake a shutdown to trigger the client abort. + close(r.shutdown) + + addr := r.listener.Addr().String() + + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", addr) + require.NoError(t, err) + + // With shutdown triggered the client should be aborted. + // The write will still succeed due to buffering but the read will fail. + _, err = conn.Write([]byte("some-filter\n")) + require.NoError(t, err) + + buf := make([]byte, 4) + n, err := conn.Read(buf) + require.Error(t, err) + switch { + case errors.Is(err, io.EOF), + errors.Is(err, syscall.ECONNRESET): + // Expected errors. + default: + t.Fatal("unexpected read error:", err) + } + require.Zero(t, n) + require.Contains(t, log.String(), "shutdown, aborting client") +} + +func TestShutdownTimeout(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + tc := newRunTest() + cli := newMockClient(tc) + r, err := newReaper(ctx, logger, withClient(cli), testConfig) + require.NoError(t, err) + + errCh := make(chan error, 1) + runCtx, runCancel := context.WithCancel(ctx) + t.Cleanup(runCancel) + go func() { + errCh <- r.run(runCtx) + }() + + require.NoError(t, err) + + testConnect(ctx, t, r.listener.Addr().String()) + runCancel() + + select { + case err = <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + t.Fatal("timeout", log.String()) + } + + require.Contains(t, log.String(), "shutdown timeout") +} + +func TestReapContainer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + // Run a test container. + cli := testClient(t) + config := &container.Config{ + Image: testImage, + Cmd: []string{"sleep", "10"}, + Labels: testLabels, + } + resp, err := cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) + if errdefs.IsNotFound(err) { + // Image not found, pull it. + var rc io.ReadCloser + rc, err = cli.ImagePull(ctx, testImage, image.PullOptions{}) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, rc.Close()) + }) + _, err = io.Copy(io.Discard, rc) + require.NoError(t, err) + resp, err = cli.ContainerCreate(ctx, config, nil, nil, nil, testID()) + } + require.NoError(t, err) + + t.Cleanup(func() { + // Ensure the container was / is removed. + err := cli.ContainerRemove(ctx, resp.ID, container.RemoveOptions{}) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) + }) + + // Speed up reaper for testing. + t.Setenv("RYUK_RECONNECTION_TIMEOUT", "10ms") + t.Setenv("RYUK_SHUTDOWN_TIMEOUT", "1s") + t.Setenv("RYUK_PORT", "0") + + testReaper(ctx, t, + "msg=removed containers=1 networks=0 volumes=0 images=0", + "msg=remove resource=container id="+resp.ID, + ) +} + +func TestReapNetwork(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + // Create a test network. + cli := testClient(t) + resp, err := cli.NetworkCreate(ctx, testID(), network.CreateOptions{ + Labels: testLabels, + }) + require.NoError(t, err) + + t.Cleanup(func() { + // Ensure the network was / is removed. + err := cli.NetworkRemove(ctx, resp.ID) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) + }) + + testReaper(ctx, t, + "msg=removed containers=0 networks=1 volumes=0 images=0", + "msg=remove resource=network id="+resp.ID, + ) +} + +func TestReapVolume(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + // Create a test volume. + cli := testClient(t) + resp, err := cli.VolumeCreate(ctx, volume.CreateOptions{ + Labels: testLabels, + }) + require.NoError(t, err) + + t.Cleanup(func() { + // Ensure the volume was / is removed. + err := cli.VolumeRemove(ctx, resp.Name, false) + require.Error(t, err) + require.True(t, errdefs.IsNotFound(err)) + }) + + testReaper(ctx, t, + "msg=removed containers=0 networks=0 volumes=1 images=0", + "msg=remove resource=volume id="+resp.Name, + ) +} + +func TestReapImage(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000) + t.Cleanup(cancel) + + // Create a test image. + cli := testClient(t) + context, err := archive.Tar("testdata", archive.Uncompressed) + require.NoError(t, err) + resp, err := cli.ImageBuild(ctx, context, types.ImageBuildOptions{ + Version: types.BuilderBuildKit, + Labels: testLabels, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, resp.Body.Close()) + }) + + // Process the build output, discarding it so we catch any + // errors and get the image ID. + var imageID string + auxCallback := func(msg jsonmessage.JSONMessage) { + if msg.ID != imageBuildResult { + return + } + var result types.BuildResult + err := json.Unmarshal(*msg.Aux, &result) + require.NoError(t, err) + imageID = result.ID + } + err = jsonmessage.DisplayJSONMessagesStream(resp.Body, io.Discard, 0, false, auxCallback) + require.NoError(t, err) + require.NotEmpty(t, imageID) + + t.Cleanup(func() { + // Ensure the image was / is removed. + resp, err := cli.ImageRemove(ctx, imageID, image.RemoveOptions{}) + require.Error(t, err) + require.Empty(t, resp) + }) + + testReaper(ctx, t, + "msg=removed containers=0 networks=0 volumes=0 images=1", + "msg=remove resource=image id="+imageID, + ) +} + +// testID returns a unique test ID. +func testID() string { + return fmt.Sprintf("test-%d", time.Now().UnixNano()) +} + +// testClient returns a new docker client for testing. +func testClient(t *testing.T) *client.Client { + t.Helper() + + cli, err := client.NewClientWithOpts( + client.FromEnv, + client.WithAPIVersionNegotiation(), + ) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, cli.Close()) + }) + + return cli +} + +// testReaper runs the reaper with test labels and validates that expect +// strings are found in the log output. +func testReaper(ctx context.Context, t *testing.T, expect ...string) { + t.Helper() + + // Speed up reaper for testing and use a random port. + t.Setenv("RYUK_RECONNECTION_TIMEOUT", "10ms") + t.Setenv("RYUK_SHUTDOWN_TIMEOUT", "1s") + t.Setenv("RYUK_PORT", "0") + + // Start the reaper. + var log safeBuffer + logger := withLogger(slog.New(slog.NewTextHandler(&log, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }))) + r, err := newReaper(ctx, logger) + require.NoError(t, err) + + reaperErr := make(chan error, 1) + go func() { + reaperErr <- r.run(ctx) + }() + + // Inform the reaper of the labels to reap. + addr := r.listener.Addr().String() + clientCtx, clientCancel := context.WithCancel(ctx) + t.Cleanup(clientCancel) // Ensure the clientCtx is cancelled on failure. + testConnect(clientCtx, t, addr) + clientCancel() + + select { + case err = <-reaperErr: + require.NoError(t, err) + + data := log.String() + for _, e := range expect { + require.Contains(t, data, e) + } + case <-ctx.Done(): + t.Fatal("timeout", log.String()) + } +} diff --git a/testdata/Dockerfile b/testdata/Dockerfile new file mode 100644 index 0000000..c35f1b5 --- /dev/null +++ b/testdata/Dockerfile @@ -0,0 +1 @@ +FROM scratch