|
| 1 | +// Copyright (c) 2022 Uber Technologies, Inc. |
| 2 | +// |
| 3 | +// Permission is hereby granted, free of charge, to any person obtaining a copy |
| 4 | +// of this software and associated documentation files (the "Software"), to deal |
| 5 | +// in the Software without restriction, including without limitation the rights |
| 6 | +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| 7 | +// copies of the Software, and to permit persons to whom the Software is |
| 8 | +// furnished to do so, subject to the following conditions: |
| 9 | +// |
| 10 | +// The above copyright notice and this permission notice shall be included in |
| 11 | +// all copies or substantial portions of the Software. |
| 12 | +// |
| 13 | +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 14 | +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 15 | +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 16 | +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 17 | +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 18 | +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| 19 | +// THE SOFTWARE. |
| 20 | + |
| 21 | +package dockerexternal |
| 22 | + |
| 23 | +import ( |
| 24 | + "context" |
| 25 | + "errors" |
| 26 | + "fmt" |
| 27 | + "math/rand" |
| 28 | + "net" |
| 29 | + "strconv" |
| 30 | + "time" |
| 31 | + |
| 32 | + "github.com/m3db/m3/src/integration/resources/docker/dockerexternal/etcdintegration/bridge" |
| 33 | + xdockertest "github.com/m3db/m3/src/x/dockertest" |
| 34 | + xerrors "github.com/m3db/m3/src/x/errors" |
| 35 | + "github.com/m3db/m3/src/x/instrument" |
| 36 | + "github.com/m3db/m3/src/x/retry" |
| 37 | + |
| 38 | + "github.com/ory/dockertest/v3" |
| 39 | + "github.com/ory/dockertest/v3/docker" |
| 40 | + clientv3 "go.etcd.io/etcd/client/v3" |
| 41 | + "go.uber.org/zap" |
| 42 | + "google.golang.org/grpc" |
| 43 | +) |
| 44 | + |
| 45 | +var ( |
| 46 | + etcdImage = xdockertest.Image{ |
| 47 | + Name: "bitnami/etcd", |
| 48 | + Tag: "3.5.4", |
| 49 | + } |
| 50 | +) |
| 51 | + |
| 52 | +// NewEtcd constructs a single etcd node, running in a docker container. |
| 53 | +func NewEtcd( |
| 54 | + pool *dockertest.Pool, |
| 55 | + instrumentOpts instrument.Options, |
| 56 | + options ...EtcdClusterOption, |
| 57 | +) (*EtcdNode, error) { |
| 58 | + logger := instrumentOpts.Logger() |
| 59 | + if logger == nil { |
| 60 | + logger = zap.NewNop() |
| 61 | + instrumentOpts = instrumentOpts.SetLogger(logger) |
| 62 | + } |
| 63 | + |
| 64 | + var opts etcdClusterOptions |
| 65 | + for _, o := range options { |
| 66 | + o.apply(&opts) |
| 67 | + } |
| 68 | + |
| 69 | + return &EtcdNode{ |
| 70 | + pool: pool, |
| 71 | + instrumentOpts: instrumentOpts, |
| 72 | + logger: logger, |
| 73 | + opts: opts, |
| 74 | + // Solely for mocking in tests--unfortunately we don't want to take in the etcd client as a dependency here |
| 75 | + // (we don't know the endpoints, and therefore need to construct it ourselves). |
| 76 | + // Thus, we do two hops (mock newClient returning mock memberClient) |
| 77 | + newClient: func(config clientv3.Config) (memberClient, error) { |
| 78 | + return clientv3.New(config) |
| 79 | + }, |
| 80 | + }, nil |
| 81 | +} |
| 82 | + |
| 83 | +// EtcdNode is a single etcd node, running via a docker container. |
| 84 | +//nolint:maligned |
| 85 | +type EtcdNode struct { |
| 86 | + instrumentOpts instrument.Options |
| 87 | + logger *zap.Logger |
| 88 | + pool *dockertest.Pool |
| 89 | + opts etcdClusterOptions |
| 90 | + |
| 91 | + // namePrefix is used to name the cluster. Exists solely for unittests in this package; otherwise a const |
| 92 | + namePrefix string |
| 93 | + newClient func(config clientv3.Config) (memberClient, error) |
| 94 | + |
| 95 | + // initialized by Setup |
| 96 | + address string |
| 97 | + resource *xdockertest.Resource |
| 98 | + etcdCli *clientv3.Client |
| 99 | + bridge *bridge.Bridge |
| 100 | + |
| 101 | + stopped bool |
| 102 | +} |
| 103 | + |
| 104 | +// Setup starts the docker container. |
| 105 | +func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) { |
| 106 | + if c.resource != nil { |
| 107 | + return errors.New("etcd cluster already started") |
| 108 | + } |
| 109 | + |
| 110 | + // nolint:gosec |
| 111 | + id := rand.New(rand.NewSource(time.Now().UnixNano())).Int() |
| 112 | + |
| 113 | + namePrefix := "m3-test-etcd-" |
| 114 | + if c.namePrefix != "" { |
| 115 | + // support overriding for tests |
| 116 | + namePrefix = c.namePrefix |
| 117 | + } |
| 118 | + |
| 119 | + // Roughly, runs: |
| 120 | + // docker run --rm --env ALLOW_NONE_AUTHENTICATION=yes -it --name Etcd bitnami/etcd |
| 121 | + // Port 2379 on the container is bound to a free port on the host |
| 122 | + resource, err := xdockertest.NewDockerResource(c.pool, xdockertest.ResourceOptions{ |
| 123 | + OverrideDefaults: false, |
| 124 | + // TODO: what even is this? |
| 125 | + Source: "etcd", |
| 126 | + ContainerName: fmt.Sprintf("%s%d", namePrefix, id), |
| 127 | + Image: etcdImage, |
| 128 | + Env: []string{"ALLOW_NONE_AUTHENTICATION=yes"}, |
| 129 | + InstrumentOpts: c.instrumentOpts, |
| 130 | + PortMappings: map[docker.Port][]docker.PortBinding{ |
| 131 | + "2379/tcp": {{ |
| 132 | + HostIP: "0.0.0.0", |
| 133 | + HostPort: strconv.Itoa(c.opts.port), |
| 134 | + }}, |
| 135 | + }, |
| 136 | + NoNetworkOverlay: true, |
| 137 | + }) |
| 138 | + |
| 139 | + if err != nil { |
| 140 | + return fmt.Errorf("starting etcd container: %w", err) |
| 141 | + } |
| 142 | + |
| 143 | + container := resource.Resource().Container |
| 144 | + c.logger.Info("etcd container started", |
| 145 | + zap.String("containerID", container.ID), |
| 146 | + zap.Any("ports", container.NetworkSettings.Ports), |
| 147 | + // Uncomment if you need gory details about the container printed; equivalent of `docker inspect <id> |
| 148 | + // zap.Any("container", container), |
| 149 | + ) |
| 150 | + // Extract the port on which we are listening. |
| 151 | + // This is coming from the equivalent of docker inspect <container_id> |
| 152 | + portBinds := container.NetworkSettings.Ports["2379/tcp"] |
| 153 | + |
| 154 | + // If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine. |
| 155 | + // See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16 |
| 156 | + ipAddr := "127.0.0.1" |
| 157 | + _, err = net.ResolveIPAddr("ip4", "host.docker.internal") |
| 158 | + if err == nil { |
| 159 | + c.logger.Info("Running tests within a docker container (e.g. for buildkite. " + |
| 160 | + "Using host.docker.internal to talk to etcd") |
| 161 | + ipAddr = "host.docker.internal" |
| 162 | + } |
| 163 | + |
| 164 | + c.resource = resource |
| 165 | + c.address = fmt.Sprintf("%s:%s", ipAddr, portBinds[0].HostPort) |
| 166 | + |
| 167 | + etcdCli, err := clientv3.New( |
| 168 | + clientv3.Config{ |
| 169 | + Endpoints: []string{c.address}, |
| 170 | + DialTimeout: 5 * time.Second, |
| 171 | + DialOptions: []grpc.DialOption{grpc.WithBlock()}, |
| 172 | + Logger: c.logger, |
| 173 | + }, |
| 174 | + ) |
| 175 | + if err != nil { |
| 176 | + return fmt.Errorf("constructing etcd client: %w", err) |
| 177 | + } |
| 178 | + |
| 179 | + defer func() { |
| 180 | + if err := etcdCli.Close(); err != nil { |
| 181 | + var merr xerrors.MultiError |
| 182 | + closeErr = merr. |
| 183 | + Add(closeErr). |
| 184 | + Add(fmt.Errorf("closing etcd client: %w", err)). |
| 185 | + FinalError() |
| 186 | + } |
| 187 | + }() |
| 188 | + |
| 189 | + return c.waitForHealth(ctx, etcdCli) |
| 190 | +} |
| 191 | + |
| 192 | +func (c *EtcdNode) containerHostPort() string { |
| 193 | + portBinds := c.resource.Resource().Container.NetworkSettings.Ports["2379/tcp"] |
| 194 | + |
| 195 | + return fmt.Sprintf("127.0.0.1:%s", portBinds[0].HostPort) |
| 196 | +} |
| 197 | + |
| 198 | +func (c *EtcdNode) waitForHealth(ctx context.Context, memberCli memberClient) error { |
| 199 | + retrier := retry.NewRetrier(retry.NewOptions(). |
| 200 | + SetForever(true). |
| 201 | + SetMaxBackoff(5 * time.Second), |
| 202 | + ) |
| 203 | + |
| 204 | + var timeout time.Duration |
| 205 | + deadline, ok := ctx.Deadline() |
| 206 | + if ok { |
| 207 | + timeout = deadline.Sub(time.Now()) |
| 208 | + } |
| 209 | + c.logger.Info( |
| 210 | + "Waiting for etcd to report healthy (via member list)", |
| 211 | + zap.String("timeout", timeout.String()), |
| 212 | + ) |
| 213 | + err := retrier.AttemptContext(ctx, func() error { |
| 214 | + _, err := memberCli.MemberList(ctx) |
| 215 | + if err != nil { |
| 216 | + c.logger.Info( |
| 217 | + "Failed connecting to etcd while waiting for container to come up", |
| 218 | + zap.Error(err), |
| 219 | + zap.String("endpoints", c.address), |
| 220 | + ) |
| 221 | + } |
| 222 | + return err |
| 223 | + }) |
| 224 | + if err == nil { |
| 225 | + c.logger.Info("etcd is healthy") |
| 226 | + return nil |
| 227 | + } |
| 228 | + return fmt.Errorf("waiting for etcd to become healthy: %w", err) |
| 229 | +} |
| 230 | + |
| 231 | +// Close stops the etcd node, and removes it. |
| 232 | +func (c *EtcdNode) Close(ctx context.Context) error { |
| 233 | + var err xerrors.MultiError |
| 234 | + err = err. |
| 235 | + Add(c.resource.Close()) |
| 236 | + return err.FinalError() |
| 237 | +} |
| 238 | + |
| 239 | +// Address is the host:port of the etcd node for use by etcd clients. |
| 240 | +func (c *EtcdNode) Address() string { |
| 241 | + return c.address |
| 242 | +} |
| 243 | + |
| 244 | +// Stop stops the etcd container, but does not purge it. A stopped container can be restarted with Restart. |
| 245 | +func (c *EtcdNode) Stop(ctx context.Context) error { |
| 246 | + if c.stopped { |
| 247 | + return errors.New("etcd node is already stopped") |
| 248 | + } |
| 249 | + if err := c.pool.Client.StopContainerWithContext(c.resource.Resource().Container.ID, 0, ctx); err != nil { |
| 250 | + return err |
| 251 | + } |
| 252 | + c.stopped = true |
| 253 | + return nil |
| 254 | +} |
| 255 | + |
| 256 | +// Restart restarts the etcd container. If it isn't currently stopped, the etcd container will be stopped and then |
| 257 | +// started; else it will just be start. |
| 258 | +func (c *EtcdNode) Restart(ctx context.Context) error { |
| 259 | + if !c.stopped { |
| 260 | + c.logger.Info("Stopping etcd node") |
| 261 | + |
| 262 | + if err := c.Stop(ctx); err != nil { |
| 263 | + return fmt.Errorf("stopping etcd node for Restart: %w", err) |
| 264 | + } |
| 265 | + } |
| 266 | + err := c.pool.Client.StartContainerWithContext(c.resource.Resource().Container.ID, nil, ctx) |
| 267 | + if err != nil { |
| 268 | + return fmt.Errorf("starting etcd node for Restart: %w", err) |
| 269 | + } |
| 270 | + c.stopped = false |
| 271 | + return nil |
| 272 | +} |
| 273 | + |
| 274 | +var _ memberClient = (*clientv3.Client)(nil) |
| 275 | + |
| 276 | +// memberClient exposes just one method of *clientv3.Client, for purposes of tests. |
| 277 | +type memberClient interface { |
| 278 | + MemberList(ctx context.Context) (*clientv3.MemberListResponse, error) |
| 279 | +} |
0 commit comments