Skip to content

Commit

Permalink
etcd_docker 4: Incorporate docker based etcd into Go integration tests (
Browse files Browse the repository at this point in the history
#4148)

PR 4 for #4144

High level approach is as described in #4144 .

This PR integrates docker based etcd into our Go integration tests. It removes the need to have the embed package running
in m3db for them, but doesn't yet touch that functionality.

commit-id:3ae12ffd
andrewmains12 committed Mar 30, 2023
1 parent 15d7fb3 commit 135cdd8
Showing 11 changed files with 254 additions and 117 deletions.
11 changes: 11 additions & 0 deletions src/integration/aggregator/aggregator.go
Original file line number Diff line number Diff line change
@@ -117,6 +117,17 @@ ingest:
maxBackoff: 10s
jitter: true
storeMetricsType: true
clusterManagement:
etcd:
env: default_env
zone: embedded
service: m3db
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- 127.0.0.1:2379
`

// TestAggregatorAggregatorConfig is the test config for the aggregators.
2 changes: 2 additions & 0 deletions src/integration/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build cluster_integration
// +build cluster_integration

//
// Copyright (c) 2021 Uber Technologies, Inc.
//
74 changes: 53 additions & 21 deletions src/integration/repair/repair_and_replication_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build cluster_integration
// +build cluster_integration

//
// Copyright (c) 2021 Uber Technologies, Inc.
//
@@ -23,28 +25,45 @@
package repair

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal"
"github.com/m3db/m3/src/integration/resources/inprocess"
"github.com/m3db/m3/src/x/instrument"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestRepairAndReplication(t *testing.T) {
t.Skip("failing after etcd containerization; fix.")
cluster1, cluster2, closer := testSetup(t)
defer closer()

RunTest(t, cluster1, cluster2)
}

func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func()) {
fullCfgs1 := getClusterFullConfgs(t)
fullCfgs2 := getClusterFullConfgs(t)
pool, err := dockertest.NewPool("")
require.NoError(t, err)

ep1 := fullCfgs1.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints
ep2 := fullCfgs2.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints
etcd1 := mustNewStartedEtcd(t, pool)
etcd2 := mustNewStartedEtcd(t, pool)

ep1 := []string{etcd1.Address()}
ep2 := []string{etcd2.Address()}

cluster1Opts := newTestClusterOptions()
cluster1Opts.EtcdEndpoints = ep1

cluster2Opts := newTestClusterOptions()
cluster2Opts.EtcdEndpoints = ep2

fullCfgs1 := getClusterFullConfgs(t, cluster1Opts)
fullCfgs2 := getClusterFullConfgs(t, cluster2Opts)

setRepairAndReplicationCfg(
&fullCfgs1,
@@ -57,19 +76,28 @@ func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func
ep1,
)

cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, clusterOptions)
cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, cluster1Opts)
require.NoError(t, err)

cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, clusterOptions)
cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, cluster2Opts)
require.NoError(t, err)

return cluster1, cluster2, func() {
etcd1.Close(context.TODO())
etcd2.Close(context.TODO())
assert.NoError(t, cluster1.Cleanup())
assert.NoError(t, cluster2.Cleanup())
}
}

func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification {
func mustNewStartedEtcd(t *testing.T, pool *dockertest.Pool) *dockerexternal.EtcdNode {
etcd, err := dockerexternal.NewEtcd(pool, instrument.NewOptions())
require.NoError(t, err)
require.NoError(t, etcd.Setup(context.TODO()))
return etcd
}

func getClusterFullConfgs(t *testing.T, clusterOptions resources.ClusterOptions) inprocess.ClusterSpecification {
cfgs, err := inprocess.NewClusterConfigsFromYAML(
TestRepairDBNodeConfig, TestRepairCoordinatorConfig, "",
)
@@ -84,18 +112,22 @@ func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification {
func setRepairAndReplicationCfg(fullCfg *inprocess.ClusterSpecification, clusterName string, endpoints []string) {
for _, dbnode := range fullCfg.Configs.DBNodes {
dbnode.DB.Replication.Clusters[0].Name = clusterName
dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints = endpoints
etcdService := &(dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0])
etcdService.AutoSyncInterval = -1
etcdService.Endpoints = endpoints
}
}

var clusterOptions = resources.ClusterOptions{
DBNode: &resources.DBNodeClusterOptions{
RF: 2,
NumShards: 4,
NumInstances: 1,
NumIsolationGroups: 2,
},
Coordinator: resources.CoordinatorClusterOptions{
GeneratePorts: true,
},
func newTestClusterOptions() resources.ClusterOptions {
return resources.ClusterOptions{
DBNode: &resources.DBNodeClusterOptions{
RF: 2,
NumShards: 4,
NumInstances: 1,
NumIsolationGroups: 2,
},
Coordinator: resources.CoordinatorClusterOptions{
GeneratePorts: true,
},
}
}
50 changes: 25 additions & 25 deletions src/integration/resources/coordinator_client.go
Original file line number Diff line number Diff line change
@@ -59,8 +59,8 @@ var errUnknownServiceType = errors.New("unknown service type")
// operation until successful.
type RetryFunc func(op func() error) error

// ZapMethod appends the method as a log field.
func ZapMethod(s string) zapcore.Field { return zap.String("method", s) }
// zapMethod appends the method as a log field.
func zapMethod(s string) zapcore.Field { return zap.String("method", s) }

// CoordinatorClient is a client use to invoke API calls
// on a coordinator
@@ -97,7 +97,7 @@ func (c *CoordinatorClient) makeURL(resource string) string {
func (c *CoordinatorClient) GetNamespace() (admin.NamespaceGetResponse, error) {
url := c.makeURL("api/v1/services/m3db/namespace")
logger := c.logger.With(
ZapMethod("getNamespace"), zap.String("url", url))
zapMethod("getNamespace"), zap.String("url", url))

//nolint:noctx
resp, err := c.client.Get(url)
@@ -129,7 +129,7 @@ func (c *CoordinatorClient) GetPlacement(opts PlacementRequestOptions) (admin.Pl
}
url := c.makeURL(handlerurl)
logger := c.logger.With(
ZapMethod("getPlacement"), zap.String("url", url))
zapMethod("getPlacement"), zap.String("url", url))

resp, err := c.makeRequest(logger, url, placementhandler.GetHTTPMethod, nil, placementOptsToMap(opts))
if err != nil {
@@ -163,7 +163,7 @@ func (c *CoordinatorClient) InitPlacement(
}
url := c.makeURL(handlerurl)
logger := c.logger.With(
ZapMethod("initPlacement"), zap.String("url", url))
zapMethod("initPlacement"), zap.String("url", url))

resp, err := c.makeRequest(logger, url, placementhandler.InitHTTPMethod, &initRequest, placementOptsToMap(opts))
if err != nil {
@@ -194,7 +194,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er
}
url := c.makeURL(handlerurl)
logger := c.logger.With(
ZapMethod("deleteAllPlacements"), zap.String("url", url))
zapMethod("deleteAllPlacements"), zap.String("url", url))

resp, err := c.makeRequest(
logger, url, placementhandler.DeleteAllHTTPMethod, nil, placementOptsToMap(opts),
@@ -221,7 +221,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er
// NB: if the name string is empty, this will instead
// check for a successful response.
func (c *CoordinatorClient) WaitForNamespace(name string) error {
logger := c.logger.With(ZapMethod("waitForNamespace"))
logger := c.logger.With(zapMethod("waitForNamespace"))
return c.retryFunc(func() error {
ns, err := c.GetNamespace()
if err != nil {
@@ -250,7 +250,7 @@ func (c *CoordinatorClient) WaitForNamespace(name string) error {
func (c *CoordinatorClient) WaitForInstances(
ids []string,
) error {
logger := c.logger.With(ZapMethod("waitForPlacement"))
logger := c.logger.With(zapMethod("waitForPlacement"))
return c.retryFunc(func() error {
placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB})
if err != nil {
@@ -282,7 +282,7 @@ func (c *CoordinatorClient) WaitForInstances(

// WaitForShardsReady waits until all shards gets ready.
func (c *CoordinatorClient) WaitForShardsReady() error {
logger := c.logger.With(ZapMethod("waitForShards"))
logger := c.logger.With(zapMethod("waitForShards"))
return c.retryFunc(func() error {
placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB})
if err != nil {
@@ -307,7 +307,7 @@ func (c *CoordinatorClient) WaitForShardsReady() error {
func (c *CoordinatorClient) WaitForClusterReady() error {
var (
url = c.makeURL("ready")
logger = c.logger.With(ZapMethod("waitForClusterReady"), zap.String("url", url))
logger = c.logger.With(zapMethod("waitForClusterReady"), zap.String("url", url))
)
return c.retryFunc(func() error {
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
@@ -350,7 +350,7 @@ func (c *CoordinatorClient) CreateDatabase(
) (admin.DatabaseCreateResponse, error) {
url := c.makeURL("api/v1/database/create")
logger := c.logger.With(
ZapMethod("createDatabase"), zap.String("url", url),
zapMethod("createDatabase"), zap.String("url", url),
zap.String("request", addRequest.String()))

resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil)
@@ -383,7 +383,7 @@ func (c *CoordinatorClient) AddNamespace(
) (admin.NamespaceGetResponse, error) {
url := c.makeURL("api/v1/services/m3db/namespace")
logger := c.logger.With(
ZapMethod("addNamespace"), zap.String("url", url),
zapMethod("addNamespace"), zap.String("url", url),
zap.String("request", addRequest.String()))

resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil)
@@ -411,7 +411,7 @@ func (c *CoordinatorClient) UpdateNamespace(
) (admin.NamespaceGetResponse, error) {
url := c.makeURL("api/v1/services/m3db/namespace")
logger := c.logger.With(
ZapMethod("updateNamespace"), zap.String("url", url),
zapMethod("updateNamespace"), zap.String("url", url),
zap.String("request", req.String()))

resp, err := c.makeRequest(logger, url, http.MethodPut, &req, nil)
@@ -431,7 +431,7 @@ func (c *CoordinatorClient) UpdateNamespace(
func (c *CoordinatorClient) setNamespaceReady(name string) error {
url := c.makeURL("api/v1/services/m3db/namespace/ready")
logger := c.logger.With(
ZapMethod("setNamespaceReady"), zap.String("url", url),
zapMethod("setNamespaceReady"), zap.String("url", url),
zap.String("namespace", name))

_, err := c.makeRequest(logger, url, http.MethodPost, // nolint: bodyclose
@@ -445,7 +445,7 @@ func (c *CoordinatorClient) setNamespaceReady(name string) error {
// DeleteNamespace removes the namespace.
func (c *CoordinatorClient) DeleteNamespace(namespaceID string) error {
url := c.makeURL("api/v1/services/m3db/namespace/" + namespaceID)
logger := c.logger.With(ZapMethod("deleteNamespace"), zap.String("url", url))
logger := c.logger.With(zapMethod("deleteNamespace"), zap.String("url", url))

if _, err := c.makeRequest(logger, url, http.MethodDelete, nil, nil); err != nil { // nolint: bodyclose
logger.Error("failed to delete namespace", zap.Error(err))
@@ -462,7 +462,7 @@ func (c *CoordinatorClient) InitM3msgTopic(
) (admin.TopicGetResponse, error) {
url := c.makeURL(topic.InitURL)
logger := c.logger.With(
ZapMethod("initM3msgTopic"),
zapMethod("initM3msgTopic"),
zap.String("url", url),
zap.String("request", initRequest.String()),
zap.String("topic", fmt.Sprintf("%v", topicOpts)))
@@ -489,7 +489,7 @@ func (c *CoordinatorClient) GetM3msgTopic(
) (admin.TopicGetResponse, error) {
url := c.makeURL(topic.GetURL)
logger := c.logger.With(
ZapMethod("getM3msgTopic"), zap.String("url", url),
zapMethod("getM3msgTopic"), zap.String("url", url),
zap.String("topic", fmt.Sprintf("%v", topicOpts)))

resp, err := c.makeRequest(logger, url, topic.GetHTTPMethod, nil, m3msgTopicOptionsToMap(topicOpts))
@@ -516,7 +516,7 @@ func (c *CoordinatorClient) AddM3msgTopicConsumer(
) (admin.TopicGetResponse, error) {
url := c.makeURL(topic.AddURL)
logger := c.logger.With(
ZapMethod("addM3msgTopicConsumer"),
zapMethod("addM3msgTopicConsumer"),
zap.String("url", url),
zap.String("request", addRequest.String()),
zap.String("topic", fmt.Sprintf("%v", topicOpts)))
@@ -557,7 +557,7 @@ func (c *CoordinatorClient) WriteCarbon(
url string, metric string, v float64, t time.Time,
) error {
logger := c.logger.With(
ZapMethod("writeCarbon"), zap.String("url", url),
zapMethod("writeCarbon"), zap.String("url", url),
zap.String("at time", time.Now().String()),
zap.String("at ts", t.String()))

@@ -623,7 +623,7 @@ func (c *CoordinatorClient) WritePromWithRequest(writeRequest prompb.WriteReques
url := c.makeURL("api/v1/prom/remote/write")

logger := c.logger.With(
ZapMethod("writeProm"), zap.String("url", url),
zapMethod("writeProm"), zap.String("url", url),
zap.String("request", writeRequest.String()))

body, err := proto.Marshal(&writeRequest)
@@ -697,7 +697,7 @@ func (c *CoordinatorClient) ApplyKVUpdate(update string) error {
url := c.makeURL("api/v1/kvstore")

logger := c.logger.With(
ZapMethod("ApplyKVUpdate"), zap.String("url", url),
zapMethod("ApplyKVUpdate"), zap.String("url", url),
zap.String("update", update))

data := bytes.NewBuffer([]byte(update))
@@ -731,7 +731,7 @@ func (c *CoordinatorClient) query(
) error {
url := c.makeURL(query)
logger := c.logger.With(
ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
zapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
logger.Info("running")
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
if err != nil {
@@ -962,7 +962,7 @@ func (c *CoordinatorClient) runQuery(
) (string, error) {
url := c.makeURL(query)
logger := c.logger.With(
ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
zapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
logger.Info("running")
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
if err != nil {
@@ -1000,7 +1000,7 @@ func (c *CoordinatorClient) runQuery(
func (c *CoordinatorClient) RunQuery(
verifier ResponseVerifier, query string, headers map[string][]string,
) error {
logger := c.logger.With(ZapMethod("runQuery"),
logger := c.logger.With(zapMethod("runQuery"),
zap.String("query", query))
err := c.retryFunc(func() error {
err := c.query(verifier, query, headers)
@@ -1067,7 +1067,7 @@ func (c *CoordinatorClient) GraphiteQuery(

url := c.makeURL(queryStr)
logger := c.logger.With(
ZapMethod("graphiteQuery"), zap.String("url", url))
zapMethod("graphiteQuery"), zap.String("url", url))
logger.Info("running")
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
if err != nil {
2 changes: 0 additions & 2 deletions src/integration/resources/docker/dockerexternal/etcd.go
Original file line number Diff line number Diff line change
@@ -175,8 +175,6 @@ func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) {
// This is coming from the equivalent of docker inspect <container_id>
portBinds := container.NetworkSettings.Ports["2379/tcp"]

// If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine.
// See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16
ipAddr := "127.0.0.1"
_, err = net.ResolveIPAddr("ip4", "host.docker.internal")
if err == nil {
14 changes: 14 additions & 0 deletions src/integration/resources/inprocess/aggregator.go
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ import (
m3agg "github.com/m3db/m3/src/aggregator/aggregator"
"github.com/m3db/m3/src/aggregator/server"
"github.com/m3db/m3/src/aggregator/tools/deploy"
etcdclient "github.com/m3db/m3/src/cluster/client/etcd"
"github.com/m3db/m3/src/cmd/services/m3aggregator/config"
"github.com/m3db/m3/src/integration/resources"
nettest "github.com/m3db/m3/src/integration/resources/net"
@@ -63,12 +64,16 @@ type Aggregator struct {

// AggregatorOptions are options of starting an in-process aggregator.
type AggregatorOptions struct {
// EtcdEndpoints are the endpoints this aggregator should use to connect to etcd.
EtcdEndpoints []string

// Logger is the logger to use for the in-process aggregator.
Logger *zap.Logger
// StartFn is a custom function that can be used to start the Aggregator.
StartFn AggregatorStartFn
// Start indicates whether to start the aggregator instance
Start bool

// GeneratePorts will automatically update the config to use open ports
// if set to true. If false, configuration is used as-is re: ports.
GeneratePorts bool
@@ -286,6 +291,10 @@ func updateAggregatorConfig(
}
}

kvCfg := cfg.KVClientOrDefault()
cfg.KVClient = &kvCfg
updateEtcdEndpoints(opts.EtcdEndpoints, cfg.KVClient.Etcd)

// Replace any filepath with a temporary directory
cfg, tmpDirs, err = updateAggregatorFilepaths(cfg)
if err != nil {
@@ -295,6 +304,11 @@ func updateAggregatorConfig(
return cfg, tmpDirs, nil
}

func updateEtcdEndpoints(etcdEndpoints []string, etcdCfg *etcdclient.Configuration) {
etcdCfg.ETCDClusters[0].Endpoints = etcdEndpoints
etcdCfg.ETCDClusters[0].AutoSyncInterval = -1
}

func updateAggregatorHostID(cfg config.Configuration) config.Configuration {
hostID := uuid.New().String()
aggCfg := cfg.AggregatorOrDefault()
166 changes: 97 additions & 69 deletions src/integration/resources/inprocess/cluster.go
Original file line number Diff line number Diff line change
@@ -21,11 +21,12 @@
package inprocess

import (
"context"
"errors"
"fmt"
"net"
"strconv"
"time"

etcdclient "github.com/m3db/m3/src/cluster/client/etcd"
aggcfg "github.com/m3db/m3/src/cmd/services/m3aggregator/config"
dbcfg "github.com/m3db/m3/src/cmd/services/m3dbnode/config"
coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config"
@@ -34,13 +35,15 @@ import (
"github.com/m3db/m3/src/dbnode/environment"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/integration/resources"
nettest "github.com/m3db/m3/src/integration/resources/net"
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal"
"github.com/m3db/m3/src/query/storage/m3"
xconfig "github.com/m3db/m3/src/x/config"
"github.com/m3db/m3/src/x/config/hostid"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

"github.com/google/uuid"
"github.com/ory/dockertest/v3"
"go.uber.org/zap"
"gopkg.in/yaml.v2"
)
@@ -122,17 +125,51 @@ func NewClusterConfigsFromConfigFile(
// NewClusterConfigsFromYAML creates a new ClusterConfigs object from YAML strings
// representing component configs.
func NewClusterConfigsFromYAML(dbnodeYaml string, coordYaml string, aggYaml string) (ClusterConfigs, error) {
var dbCfg dbcfg.Configuration
// "db":
// discovery:
// "config":
// "service":
// "etcdClusters":
// - "endpoints": ["http://127.0.0.1:2379"]
// "zone": "embedded"
// "service": "m3db"
// "zone": "embedded"
// "env": "default_env"
etcdClientCfg := &etcdclient.Configuration{
Zone: "embedded",
Env: "default_env",
Service: "m3db",
ETCDClusters: []etcdclient.ClusterConfig{{
Zone: "embedded",
Endpoints: []string{"http://127.0.0.1:2379"},
}},
}
var dbCfg = dbcfg.Configuration{
DB: &dbcfg.DBConfiguration{
Discovery: &discovery.Configuration{
Config: &environment.Configuration{
Services: environment.DynamicConfiguration{{
Service: etcdClientCfg,
}},
},
},
},
}
if err := yaml.Unmarshal([]byte(dbnodeYaml), &dbCfg); err != nil {
return ClusterConfigs{}, err
}

var coordCfg coordinatorcfg.Configuration
var coordCfg = coordinatorcfg.Configuration{
ClusterManagement: coordinatorcfg.ClusterManagementConfiguration{
Etcd: etcdClientCfg,
},
}
if err := yaml.Unmarshal([]byte(coordYaml), &coordCfg); err != nil {
return ClusterConfigs{}, err
}

var aggCfg aggcfg.Configuration
var aggCfg = aggcfg.Configuration{}

if aggYaml != "" {
if err := yaml.Unmarshal([]byte(aggYaml), &aggCfg); err != nil {
return ClusterConfigs{}, err
@@ -164,7 +201,7 @@ func NewCluster(
func NewClusterFromSpecification(
specs ClusterSpecification,
opts resources.ClusterOptions,
) (resources.M3Resources, error) {
) (_ resources.M3Resources, finalErr error) {
if err := opts.Validate(); err != nil {
return nil, err
}
@@ -175,6 +212,7 @@ func NewClusterFromSpecification(
}

var (
etcd *dockerexternal.EtcdNode
coord resources.Coordinator
nodes = make(resources.Nodes, 0, len(specs.Configs.DBNodes))
aggs = make(resources.Aggregators, 0, len(specs.Configs.Aggregators))
@@ -185,13 +223,38 @@ func NewClusterFromSpecification(
// Ensure that once we start creating resources, they all get cleaned up even if the function
// fails half way.
defer func() {
if err != nil {
cleanup(logger, nodes, coord, aggs)
if finalErr != nil {
cleanup(logger, etcd, nodes, coord, aggs)
}
}()

etcdEndpoints := opts.EtcdEndpoints
if len(opts.EtcdEndpoints) == 0 {
// TODO: amainsd: maybe not the cleanest place to do this.
pool, err := dockertest.NewPool("")
if err != nil {
return nil, err
}
etcd, err = dockerexternal.NewEtcd(pool, instrument.NewOptions())
if err != nil {
return nil, err
}

// TODO(amains): etcd *needs* to be setup before the coordinator, because ConfigurePlacementsForAggregation spins
// up a dedicated coordinator for some reason. Either clean this up or just accept it.
if err := etcd.Setup(context.TODO()); err != nil {
return nil, err
}
etcdEndpoints = []string{fmt.Sprintf(etcd.Address())}
}

updateEtcdEndpoints := func(etcdCfg *etcdclient.Configuration) {
etcdCfg.ETCDClusters[0].Endpoints = etcdEndpoints
etcdCfg.ETCDClusters[0].AutoSyncInterval = -1
}
for i := 0; i < len(specs.Configs.DBNodes); i++ {
var node resources.Node
updateEtcdEndpoints(specs.Configs.DBNodes[i].DB.Discovery.Config.Services[0].Service)
node, err = NewDBNode(specs.Configs.DBNodes[i], specs.Options.DBNode[i])
if err != nil {
return nil, err
@@ -204,13 +267,15 @@ func NewClusterFromSpecification(
agg, err = NewAggregator(aggCfg, AggregatorOptions{
GeneratePorts: true,
GenerateHostID: false,
EtcdEndpoints: etcdEndpoints,
})
if err != nil {
return nil, err
}
aggs = append(aggs, agg)
}

updateEtcdEndpoints(specs.Configs.Coordinator.ClusterManagement.Etcd)
coord, err = NewCoordinator(
specs.Configs.Coordinator,
CoordinatorOptions{GeneratePorts: opts.Coordinator.GeneratePorts},
@@ -220,14 +285,15 @@ func NewClusterFromSpecification(
}

if err = ConfigurePlacementsForAggregation(nodes, coord, aggs, specs, opts); err != nil {
return nil, err
return nil, fmt.Errorf("failed to setup placements for aggregation: %w", err)
}

// Start all the configured resources.
m3 := NewM3Resources(ResourceOptions{
Coordinator: coord,
DBNodes: nodes,
Aggregators: aggs,
Etcd: etcd,
})
m3.Start()

@@ -371,13 +437,13 @@ func GenerateDBNodeConfigsForCluster(
// the etcd server (i.e. seed node).
hostID := uuid.NewString()
defaultDBNodesCfg := configs.DBNode
discoveryCfg, envConfig, err := generateDefaultDiscoveryConfig(
defaultDBNodesCfg,
hostID,
generatePortsAndIDs)
if err != nil {
return nil, nil, environment.Configuration{}, err

if configs.DBNode.DB.Discovery == nil {
return nil, nil, environment.Configuration{}, errors.New(
"configuration must specify at least `discovery`" +
" in order to construct an etcd client")
}
discoveryCfg, envConfig := configs.DBNode.DB.Discovery, configs.DBNode.DB.Discovery.Config

var (
defaultDBNodeOpts = DBNodeOptions{
@@ -389,8 +455,7 @@ func GenerateDBNodeConfigsForCluster(
nodeOpts = make([]DBNodeOptions, 0, numNodes)
)
for i := 0; i < int(numNodes); i++ {
var cfg dbcfg.Configuration
cfg, err = defaultDBNodesCfg.DeepCopy()
cfg, err := defaultDBNodesCfg.DeepCopy()
if err != nil {
return nil, nil, environment.Configuration{}, err
}
@@ -404,68 +469,31 @@ func GenerateDBNodeConfigsForCluster(
Value: &hostID,
}
}
cfg.DB.Discovery = &discoveryCfg
cfg.DB.Discovery = discoveryCfg

cfgs = append(cfgs, cfg)
nodeOpts = append(nodeOpts, dbnodeOpts)
}

return cfgs, nodeOpts, envConfig, nil
return cfgs, nodeOpts, *envConfig, nil
}

// generateDefaultDiscoveryConfig handles creating the correct config
// for having an embedded ETCD server with the correct server and
// client configuration.
func generateDefaultDiscoveryConfig(
cfg dbcfg.Configuration,
hostID string,
generateETCDPorts bool,
) (discovery.Configuration, environment.Configuration, error) {
discoveryConfig := cfg.DB.DiscoveryOrDefault()
envConfig, err := discoveryConfig.EnvironmentConfig(hostID)
if err != nil {
return discovery.Configuration{}, environment.Configuration{}, err
}

var (
etcdClientPort = dbcfg.DefaultEtcdClientPort
etcdServerPort = dbcfg.DefaultEtcdServerPort
)
if generateETCDPorts {
etcdClientPort, err = nettest.GetAvailablePort()
if err != nil {
return discovery.Configuration{}, environment.Configuration{}, err
}

etcdServerPort, err = nettest.GetAvailablePort()
if err != nil {
return discovery.Configuration{}, environment.Configuration{}, err
}
}
func cleanup(
logger *zap.Logger,
etcd *dockerexternal.EtcdNode,
nodes resources.Nodes,
coord resources.Coordinator,
aggs resources.Aggregators,
) {
var multiErr xerrors.MultiError

etcdServerURL := fmt.Sprintf("http://0.0.0.0:%d", etcdServerPort)
etcdClientAddr := net.JoinHostPort("0.0.0.0", strconv.Itoa(etcdClientPort))
etcdClientURL := fmt.Sprintf("http://0.0.0.0:%d", etcdClientPort)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

envConfig.SeedNodes.InitialCluster[0].Endpoint = etcdServerURL
envConfig.SeedNodes.InitialCluster[0].HostID = hostID
envConfig.Services[0].Service.ETCDClusters[0].Endpoints = []string{etcdClientAddr}
if generateETCDPorts {
envConfig.SeedNodes.ListenPeerUrls = []string{etcdServerURL}
envConfig.SeedNodes.ListenClientUrls = []string{etcdClientURL}
envConfig.SeedNodes.InitialAdvertisePeerUrls = []string{etcdServerURL}
envConfig.SeedNodes.AdvertiseClientUrls = []string{etcdClientURL}
if etcd != nil {
multiErr = multiErr.Add(etcd.Close(ctx))
}

configType := discovery.ConfigType
return discovery.Configuration{
Type: &configType,
Config: &envConfig,
}, envConfig, nil
}

func cleanup(logger *zap.Logger, nodes resources.Nodes, coord resources.Coordinator, aggs resources.Aggregators) {
var multiErr xerrors.MultiError
for _, n := range nodes {
multiErr = multiErr.Add(n.Close())
}
16 changes: 16 additions & 0 deletions src/integration/resources/inprocess/coordinator_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build test_harness
// +build test_harness

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
@@ -22,24 +24,38 @@
package inprocess

import (
"context"
"testing"

"github.com/m3db/m3/src/cluster/generated/proto/placementpb"
"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal"
"github.com/m3db/m3/src/msg/generated/proto/topicpb"
"github.com/m3db/m3/src/msg/topic"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/generated/proto/prompb"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"
"github.com/ory/dockertest/v3"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewCoordinator(t *testing.T) {
pool, err := dockertest.NewPool("")
require.NoError(t, err)

etcd, err := dockerexternal.NewEtcd(pool, instrument.NewOptions(), dockerexternal.EtcdClusterPort(2379))
require.NoError(t, err)
require.NoError(t, etcd.Setup(context.TODO()))
t.Cleanup(func() {
require.NoError(t, etcd.Close(context.TODO()))
})

dbnode, err := NewDBNodeFromYAML(defaultDBNodeConfig, DBNodeOptions{Start: true})
require.NoError(t, err)
defer func() {
2 changes: 2 additions & 0 deletions src/integration/resources/inprocess/dbnode_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build test_harness
// +build test_harness

// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
11 changes: 11 additions & 0 deletions src/integration/resources/inprocess/inprocess.go
Original file line number Diff line number Diff line change
@@ -21,19 +21,24 @@
package inprocess

import (
"context"

"github.com/m3db/m3/src/integration/resources"
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal"
"github.com/m3db/m3/src/x/errors"
)

type inprocessM3Resources struct {
coordinator resources.Coordinator
dbNodes resources.Nodes
aggregators resources.Aggregators
etcd *dockerexternal.EtcdNode
}

// ResourceOptions are the options for creating new
// resources.M3Resources.
type ResourceOptions struct {
Etcd *dockerexternal.EtcdNode
Coordinator resources.Coordinator
DBNodes resources.Nodes
Aggregators resources.Aggregators
@@ -43,6 +48,7 @@ type ResourceOptions struct {
// backed by in-process implementations of the M3 components.
func NewM3Resources(options ResourceOptions) resources.M3Resources {
return &inprocessM3Resources{
etcd: options.Etcd,
coordinator: options.Coordinator,
dbNodes: options.DBNodes,
aggregators: options.Aggregators,
@@ -73,6 +79,11 @@ func (i *inprocessM3Resources) Cleanup() error {
err = err.Add(a.Close())
}

if i.etcd != nil {
ctx := context.TODO()
err = err.Add(i.etcd.Close(ctx))
}

return err.FinalError()
}

23 changes: 23 additions & 0 deletions src/integration/resources/options.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
// Copyright (c) 2022 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package resources

import (
@@ -8,6 +28,9 @@ import (
// ClusterOptions contains options for spinning up a new M3 cluster
// composed of in-process components.
type ClusterOptions struct {
// EtcdEndpoints if provided, will be used directly instead of spinning up a dedicated etcd node for the cluster.
// By default, NewClusterFromSpecification will spin up and manage an etcd node itself.
EtcdEndpoints []string
// DBNode contains cluster options for spinning up dbnodes.
DBNode *DBNodeClusterOptions
// Aggregator is the optional cluster options for spinning up aggregators.

0 comments on commit 135cdd8

Please sign in to comment.