From b01c2c42d3fbec45d0aced144653f06f1043d64a Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 10 Dec 2024 12:37:29 +0100 Subject: [PATCH 1/9] Use a naive approach to parse port before env var expansion --- .../fix-metrics-service-address-env-var.yaml | 18 +++++++++++++ apis/v1beta1/collector_webhook_test.go | 12 ++++++++- apis/v1beta1/config.go | 26 +++++++++++++++++-- internal/manifests/collector/service.go | 3 +-- 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 .chloggen/fix-metrics-service-address-env-var.yaml diff --git a/.chloggen/fix-metrics-service-address-env-var.yaml b/.chloggen/fix-metrics-service-address-env-var.yaml new file mode 100644 index 0000000000..aaaaabaca9 --- /dev/null +++ b/.chloggen/fix-metrics-service-address-env-var.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action) +component: operator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix the admission webhook to when metrics service address host uses env var expansion + +# One or more tracking issues related to the change +issues: [3513] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This should allow the metrics service address to have the host portion expanded from an environment variable, + like `$(env:POD_IP)` instead of using `0.0.0.0`, which is the [recommended by the Collector](https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/security-best-practices.md#safeguards-against-denial-of-service-attacks). diff --git a/apis/v1beta1/collector_webhook_test.go b/apis/v1beta1/collector_webhook_test.go index 8604b91b3e..b2ce49bae6 100644 --- a/apis/v1beta1/collector_webhook_test.go +++ b/apis/v1beta1/collector_webhook_test.go @@ -588,7 +588,17 @@ func TestOTELColValidatingWebhook(t *testing.T) { five := int32(5) maxInt := int32(math.MaxInt32) - cfg := v1beta1.Config{} + cfg := v1beta1.Config{ + Service: v1beta1.Service{ + Telemetry: &v1beta1.AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "${env:POD_ID}:8888", + }, + }, + }, + }, + } err := yaml.Unmarshal([]byte(cfgYaml), &cfg) require.NoError(t, err) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 5cb9150513..077ade9869 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -427,16 +427,18 @@ type Service struct { Pipelines map[string]*Pipeline `json:"pipelines" yaml:"pipelines"` } +const defaultServicePort int32 = 8888 + // MetricsEndpoint gets the port number and host address for the metrics endpoint from the collector config if it has been set. func (s *Service) MetricsEndpoint() (string, int32, error) { defaultAddr := "0.0.0.0" if s.GetTelemetry() == nil { // telemetry isn't set, use the default - return defaultAddr, 8888, nil + return defaultAddr, defaultServicePort, nil } host, port, netErr := net.SplitHostPort(s.GetTelemetry().Metrics.Address) if netErr != nil && strings.Contains(netErr.Error(), "missing port in address") { - return defaultAddr, 8888, nil + return defaultAddr, defaultServicePort, nil } else if netErr != nil { return "", 0, netErr } @@ -452,6 +454,26 @@ func (s *Service) MetricsEndpoint() (string, int32, error) { return host, int32(i64), nil } +// NaivePort attempts gets the port number from the host address without doing any validation regarding the address itself. +// It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon +// from the env var, i.e. the address looks like "${env:POD_IP}:4317" or "${env:POD_IP}". +// It does not work in cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}". +func (s *Service) NaivePort() (int32, error) { + telemetry := s.GetTelemetry() + if telemetry == nil { + return defaultServicePort, nil + } + splitAddress := strings.Split(telemetry.Metrics.Address, ":") + if len(splitAddress) == 1 { + return defaultServicePort, nil + } + port, err := strconv.Atoi(splitAddress[len(splitAddress)-1]) + if err != nil { + return 0, err + } + return int32(port), nil +} + // ApplyDefaults inserts configuration defaults if it has not been set. func (s *Service) ApplyDefaults() error { telemetryAddr, telemetryPort, err := s.MetricsEndpoint() diff --git a/internal/manifests/collector/service.go b/internal/manifests/collector/service.go index 7e27eb752c..726238bf94 100644 --- a/internal/manifests/collector/service.go +++ b/internal/manifests/collector/service.go @@ -73,7 +73,6 @@ func HeadlessService(params manifests.Params) (*corev1.Service, error) { } func MonitoringService(params manifests.Params) (*corev1.Service, error) { - name := naming.MonitoringService(params.OtelCol.Name) labels := manifestutils.Labels(params.OtelCol.ObjectMeta, name, params.OtelCol.Spec.Image, ComponentOpenTelemetryCollector, []string{}) labels[monitoringLabel] = valueExists @@ -84,7 +83,7 @@ func MonitoringService(params manifests.Params) (*corev1.Service, error) { return nil, err } - _, metricsPort, err := params.OtelCol.Spec.Config.Service.MetricsEndpoint() + metricsPort, err := params.OtelCol.Spec.Config.Service.NaivePort() if err != nil { return nil, err } From 8fbfa7c3183eec321b119eff46c93b2bcd0a46b3 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:57:54 +0100 Subject: [PATCH 2/9] Refactor how service metrics endpoint parsing works Now, when there would be an error it gets logged and the default values are returned. With this refactor the method encapsulates all defaulting logic that was slightly spread around different places. --- apis/v1beta1/collector_webhook_test.go | 2 +- apis/v1beta1/config.go | 62 +++++++++-------------- apis/v1beta1/config_test.go | 9 ++-- internal/manifests/collector/container.go | 6 +-- internal/manifests/collector/service.go | 5 +- pkg/collector/upgrade/v0_111_0.go | 4 +- 6 files changed, 33 insertions(+), 55 deletions(-) diff --git a/apis/v1beta1/collector_webhook_test.go b/apis/v1beta1/collector_webhook_test.go index b2ce49bae6..718b7d567a 100644 --- a/apis/v1beta1/collector_webhook_test.go +++ b/apis/v1beta1/collector_webhook_test.go @@ -555,7 +555,7 @@ func TestCollectorDefaultingWebhook(t *testing.T) { ctx := context.Background() err := cvw.Default(ctx, &test.otelcol) if test.expected.Spec.Config.Service.Telemetry == nil { - assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(), "could not apply defaults") + assert.NoError(t, test.expected.Spec.Config.Service.ApplyDefaults(logr.Discard()), "could not apply defaults") } assert.NoError(t, err) assert.Equal(t, test.expected, test.otelcol) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 077ade9869..43ab9e4dee 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -18,7 +18,6 @@ import ( "bytes" "encoding/json" "fmt" - "net" "reflect" "sort" "strconv" @@ -269,7 +268,7 @@ func (c *Config) getEnvironmentVariablesForComponentKinds(logger logr.Logger, co // applyDefaultForComponentKinds applies defaults to the endpoints for the given ComponentKind(s). func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error { - if err := c.Service.ApplyDefaults(); err != nil { + if err := c.Service.ApplyDefaults(logger); err != nil { return err } enabledComponents := c.GetEnabledComponents() @@ -427,59 +426,46 @@ type Service struct { Pipelines map[string]*Pipeline `json:"pipelines" yaml:"pipelines"` } -const defaultServicePort int32 = 8888 +type serviceParseError string -// MetricsEndpoint gets the port number and host address for the metrics endpoint from the collector config if it has been set. -func (s *Service) MetricsEndpoint() (string, int32, error) { - defaultAddr := "0.0.0.0" - if s.GetTelemetry() == nil { - // telemetry isn't set, use the default - return defaultAddr, defaultServicePort, nil - } - host, port, netErr := net.SplitHostPort(s.GetTelemetry().Metrics.Address) - if netErr != nil && strings.Contains(netErr.Error(), "missing port in address") { - return defaultAddr, defaultServicePort, nil - } else if netErr != nil { - return "", 0, netErr - } - i64, err := strconv.ParseInt(port, 10, 32) - if err != nil { - return "", 0, err - } - - if host == "" { - host = defaultAddr - } - - return host, int32(i64), nil +func (s serviceParseError) Error() string { + return string(s) } -// NaivePort attempts gets the port number from the host address without doing any validation regarding the address itself. +const ( + defaultServicePort int32 = 8888 + defaultServiceHost = "0.0.0.0" +) + +// MetricsEndpoint attempts gets the host and port number from the host address without doing any validation regarding the +// address itself. // It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon // from the env var, i.e. the address looks like "${env:POD_IP}:4317" or "${env:POD_IP}". // It does not work in cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}". -func (s *Service) NaivePort() (int32, error) { +// It does not work for IPv6 hostnames/addresses. +func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32) { telemetry := s.GetTelemetry() if telemetry == nil { - return defaultServicePort, nil + return defaultServiceHost, defaultServicePort } splitAddress := strings.Split(telemetry.Metrics.Address, ":") if len(splitAddress) == 1 { - return defaultServicePort, nil + return defaultServiceHost, defaultServicePort } - port, err := strconv.Atoi(splitAddress[len(splitAddress)-1]) + port, err := strconv.ParseInt(splitAddress[len(splitAddress)-1], 10, 32) if err != nil { - return 0, err + errMsg := fmt.Sprintf("couldn't determine metrics port from configuration, using default: %s:%d", + defaultServiceHost, defaultServicePort) + logger.Info(errMsg, "error", err) + return defaultServiceHost, defaultServicePort } - return int32(port), nil + host := strings.Join(splitAddress[0:len(splitAddress)-1], ":") + return host, int32(port) } // ApplyDefaults inserts configuration defaults if it has not been set. -func (s *Service) ApplyDefaults() error { - telemetryAddr, telemetryPort, err := s.MetricsEndpoint() - if err != nil { - return err - } +func (s *Service) ApplyDefaults(logger logr.Logger) error { + telemetryAddr, telemetryPort := s.MetricsEndpoint(logger) tm := &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ diff --git a/apis/v1beta1/config_test.go b/apis/v1beta1/config_test.go index b9c288f692..8ab1c4916d 100644 --- a/apis/v1beta1/config_test.go +++ b/apis/v1beta1/config_test.go @@ -216,8 +216,7 @@ func TestGetTelemetryFromYAMLIsNil(t *testing.T) { assert.Nil(t, cfg.Service.GetTelemetry()) } -func TestConfigToMetricsPort(t *testing.T) { - +func TestConfigMetricsEndpoint(t *testing.T) { for _, tt := range []struct { desc string expectedAddr string @@ -239,7 +238,7 @@ func TestConfigToMetricsPort(t *testing.T) { }, }, { - "bad address", + "missing port", "0.0.0.0", 8888, Service{ @@ -296,9 +295,9 @@ func TestConfigToMetricsPort(t *testing.T) { }, } { t.Run(tt.desc, func(t *testing.T) { + logger := logr.Discard() // these are acceptable failures, we return to the collector's default metric port - addr, port, err := tt.config.MetricsEndpoint() - assert.NoError(t, err) + addr, port := tt.config.MetricsEndpoint(logger) assert.Equal(t, tt.expectedAddr, addr) assert.Equal(t, tt.expectedPort, port) }) diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index f499f08c55..df54f10026 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -229,11 +229,7 @@ func getConfigContainerPorts(logger logr.Logger, conf v1beta1.Config) (map[strin } } - _, metricsPort, err := conf.Service.MetricsEndpoint() - if err != nil { - logger.Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err) - metricsPort = 8888 - } + _, metricsPort := conf.Service.MetricsEndpoint(logger) ports["metrics"] = corev1.ContainerPort{ Name: "metrics", ContainerPort: metricsPort, diff --git a/internal/manifests/collector/service.go b/internal/manifests/collector/service.go index 726238bf94..8470b9505c 100644 --- a/internal/manifests/collector/service.go +++ b/internal/manifests/collector/service.go @@ -83,10 +83,7 @@ func MonitoringService(params manifests.Params) (*corev1.Service, error) { return nil, err } - metricsPort, err := params.OtelCol.Spec.Config.Service.NaivePort() - if err != nil { - return nil, err - } + _, metricsPort := params.OtelCol.Spec.Config.Service.MetricsEndpoint(params.Log) return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/collector/upgrade/v0_111_0.go b/pkg/collector/upgrade/v0_111_0.go index 5ba22efea0..3a508f59e4 100644 --- a/pkg/collector/upgrade/v0_111_0.go +++ b/pkg/collector/upgrade/v0_111_0.go @@ -18,6 +18,6 @@ import ( "github.com/open-telemetry/opentelemetry-operator/apis/v1beta1" ) -func upgrade0_111_0(_ VersionUpgrade, otelcol *v1beta1.OpenTelemetryCollector) (*v1beta1.OpenTelemetryCollector, error) { //nolint:unparam - return otelcol, otelcol.Spec.Config.Service.ApplyDefaults() +func upgrade0_111_0(u VersionUpgrade, otelcol *v1beta1.OpenTelemetryCollector) (*v1beta1.OpenTelemetryCollector, error) { //nolint:unparam + return otelcol, otelcol.Spec.Config.Service.ApplyDefaults(u.Log) } From dbd276e92f061718bfdb1b180f30e8dad5dcb452 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 10 Dec 2024 17:58:08 +0100 Subject: [PATCH 3/9] Add more tests to `Service.MetricsEndpoint` and fix them --- apis/v1beta1/config.go | 20 ++++++++++++-------- apis/v1beta1/config_test.go | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 43ab9e4dee..c8e7ea436c 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "reflect" + "regexp" "sort" "strconv" "strings" @@ -440,26 +441,29 @@ const ( // MetricsEndpoint attempts gets the host and port number from the host address without doing any validation regarding the // address itself. // It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon -// from the env var, i.e. the address looks like "${env:POD_IP}:4317" or "${env:POD_IP}". +// from the env var, i.e. the address looks like "${env:POD_IP}:4317", "${env:POD_IP}", or "${POD_IP}". // It does not work in cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}". -// It does not work for IPv6 hostnames/addresses. +// It does not work for IPv6 addresses. func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32) { telemetry := s.GetTelemetry() - if telemetry == nil { + if telemetry == nil || telemetry.Metrics.Address == "" { return defaultServiceHost, defaultServicePort } - splitAddress := strings.Split(telemetry.Metrics.Address, ":") - if len(splitAddress) == 1 { - return defaultServiceHost, defaultServicePort + + explicitPortMatches := regexp.MustCompile(`:(\d+$)`).FindStringSubmatch(telemetry.Metrics.Address) + if len(explicitPortMatches) <= 1 { + return telemetry.Metrics.Address, defaultServicePort } - port, err := strconv.ParseInt(splitAddress[len(splitAddress)-1], 10, 32) + + port, err := strconv.ParseInt(explicitPortMatches[1], 10, 32) if err != nil { errMsg := fmt.Sprintf("couldn't determine metrics port from configuration, using default: %s:%d", defaultServiceHost, defaultServicePort) logger.Info(errMsg, "error", err) return defaultServiceHost, defaultServicePort } - host := strings.Join(splitAddress[0:len(splitAddress)-1], ":") + + host, _, _ := strings.Cut(telemetry.Metrics.Address, explicitPortMatches[0]) return host, int32(port) } diff --git a/apis/v1beta1/config_test.go b/apis/v1beta1/config_test.go index 8ab1c4916d..c425aefab4 100644 --- a/apis/v1beta1/config_test.go +++ b/apis/v1beta1/config_test.go @@ -225,13 +225,13 @@ func TestConfigMetricsEndpoint(t *testing.T) { }{ { "custom port", - "0.0.0.0", + "localhost", 9090, Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ - "address": "0.0.0.0:9090", + "address": "localhost:9090", }, }, }, @@ -239,13 +239,41 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, { "missing port", - "0.0.0.0", + "localhost", 8888, Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ - "address": "0.0.0.0", + "address": "localhost", + }, + }, + }, + }, + }, + { + "env var and missing port", + "${env:POD_IP}", + 8888, + Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "${env:POD_IP}", + }, + }, + }, + }, + }, + { + "env var and with port", + "${POD_IP}", + 1234, + Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "${POD_IP}:1234", }, }, }, From 354b2ac6ab155bf41f426684e7eef20526b9306d Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Tue, 10 Dec 2024 18:08:26 +0100 Subject: [PATCH 4/9] Remove unused code --- apis/v1beta1/config.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index c8e7ea436c..63d2eb185d 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -427,12 +427,6 @@ type Service struct { Pipelines map[string]*Pipeline `json:"pipelines" yaml:"pipelines"` } -type serviceParseError string - -func (s serviceParseError) Error() string { - return string(s) -} - const ( defaultServicePort int32 = 8888 defaultServiceHost = "0.0.0.0" From 43be39b617d73c6354e69baeee3609914a26d398 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Wed, 11 Dec 2024 15:09:06 +0100 Subject: [PATCH 5/9] Make Service.MetricsEndpoint fail when can't parse port --- apis/v1beta1/config.go | 32 +++-- apis/v1beta1/config_test.go | 153 +++++++++++++++++----- internal/manifests/collector/container.go | 7 +- internal/manifests/collector/service.go | 5 +- 4 files changed, 152 insertions(+), 45 deletions(-) diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 63d2eb185d..4cec32ac5d 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -436,34 +436,46 @@ const ( // address itself. // It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon // from the env var, i.e. the address looks like "${env:POD_IP}:4317", "${env:POD_IP}", or "${POD_IP}". -// It does not work in cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}". -// It does not work for IPv6 addresses. -func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32) { +// In cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}", this returns an error. +// It should work with IPv4 and IPv6 addresses. +func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32, error) { telemetry := s.GetTelemetry() if telemetry == nil || telemetry.Metrics.Address == "" { - return defaultServiceHost, defaultServicePort + return defaultServiceHost, defaultServicePort, nil + } + + isPortEnvVar := regexp.MustCompile(`:\${[env:]?.*}$`).MatchString(telemetry.Metrics.Address) + if isPortEnvVar { + errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s", + telemetry.Metrics.Address) + logger.Info(errMsg) + return "", 0, fmt.Errorf(errMsg) } explicitPortMatches := regexp.MustCompile(`:(\d+$)`).FindStringSubmatch(telemetry.Metrics.Address) if len(explicitPortMatches) <= 1 { - return telemetry.Metrics.Address, defaultServicePort + return telemetry.Metrics.Address, defaultServicePort, nil } port, err := strconv.ParseInt(explicitPortMatches[1], 10, 32) if err != nil { - errMsg := fmt.Sprintf("couldn't determine metrics port from configuration, using default: %s:%d", - defaultServiceHost, defaultServicePort) + errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s", + telemetry.Metrics.Address) logger.Info(errMsg, "error", err) - return defaultServiceHost, defaultServicePort + return "", 0, err } host, _, _ := strings.Cut(telemetry.Metrics.Address, explicitPortMatches[0]) - return host, int32(port) + return host, int32(port), nil } // ApplyDefaults inserts configuration defaults if it has not been set. func (s *Service) ApplyDefaults(logger logr.Logger) error { - telemetryAddr, telemetryPort := s.MetricsEndpoint(logger) + telemetryAddr, telemetryPort, err := s.MetricsEndpoint(logger) + if err != nil { + return err + } + tm := &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ diff --git a/apis/v1beta1/config_test.go b/apis/v1beta1/config_test.go index c425aefab4..cb631889ee 100644 --- a/apis/v1beta1/config_test.go +++ b/apis/v1beta1/config_test.go @@ -221,13 +221,14 @@ func TestConfigMetricsEndpoint(t *testing.T) { desc string expectedAddr string expectedPort int32 + expectedErr bool config Service }{ { - "custom port", - "localhost", - 9090, - Service{ + desc: "custom port", + expectedAddr: "localhost", + expectedPort: 9090, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -238,10 +239,24 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, }, { - "missing port", - "localhost", - 8888, - Service{ + desc: "custom port ipv6", + expectedAddr: "[::]", + expectedPort: 9090, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "[::]:9090", + }, + }, + }, + }, + }, + { + desc: "missing port", + expectedAddr: "localhost", + expectedPort: 8888, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -252,10 +267,24 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, }, { - "env var and missing port", - "${env:POD_IP}", - 8888, - Service{ + desc: "missing port ipv6", + expectedAddr: "[::]", + expectedPort: 8888, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "[::]", + }, + }, + }, + }, + }, + { + desc: "env var and missing port", + expectedAddr: "${env:POD_IP}", + expectedPort: 8888, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -266,10 +295,24 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, }, { - "env var and with port", - "${POD_IP}", - 1234, - Service{ + desc: "env var and missing port ipv6", + expectedAddr: "[${env:POD_IP}]", + expectedPort: 8888, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "[${env:POD_IP}]", + }, + }, + }, + }, + }, + { + desc: "env var and with port", + expectedAddr: "${POD_IP}", + expectedPort: 1234, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -280,10 +323,50 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, }, { - "missing address", - "0.0.0.0", - 8888, - Service{ + desc: "env var and with port ipv6", + expectedAddr: "[${POD_IP}]", + expectedPort: 1234, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "[${POD_IP}]:1234", + }, + }, + }, + }, + }, + { + desc: "port is env var", + expectedErr: true, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "localhost:${env:POD_PORT}", + }, + }, + }, + }, + }, + { + desc: "port is env var ipv6", + expectedErr: true, + config: Service{ + Telemetry: &AnyConfig{ + Object: map[string]interface{}{ + "metrics": map[string]interface{}{ + "address": "[::]:${env:POD_PORT}", + }, + }, + }, + }, + }, + { + desc: "missing address", + expectedAddr: "0.0.0.0", + expectedPort: 8888, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -294,24 +377,23 @@ func TestConfigMetricsEndpoint(t *testing.T) { }, }, { - "missing metrics", - "0.0.0.0", - 8888, - Service{ + desc: "missing metrics", + expectedAddr: "0.0.0.0", + expectedPort: 8888, + config: Service{ Telemetry: &AnyConfig{}, }, }, { - "missing telemetry", - "0.0.0.0", - 8888, - Service{}, + desc: "missing telemetry", + expectedAddr: "0.0.0.0", + expectedPort: 8888, }, { - "configured telemetry", - "1.2.3.4", - 4567, - Service{ + desc: "configured telemetry", + expectedAddr: "1.2.3.4", + expectedPort: 4567, + config: Service{ Telemetry: &AnyConfig{ Object: map[string]interface{}{ "metrics": map[string]interface{}{ @@ -325,7 +407,12 @@ func TestConfigMetricsEndpoint(t *testing.T) { t.Run(tt.desc, func(t *testing.T) { logger := logr.Discard() // these are acceptable failures, we return to the collector's default metric port - addr, port := tt.config.MetricsEndpoint(logger) + addr, port, err := tt.config.MetricsEndpoint(logger) + if tt.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } assert.Equal(t, tt.expectedAddr, addr) assert.Equal(t, tt.expectedPort, port) }) diff --git a/internal/manifests/collector/container.go b/internal/manifests/collector/container.go index df54f10026..5d96258f1d 100644 --- a/internal/manifests/collector/container.go +++ b/internal/manifests/collector/container.go @@ -229,7 +229,12 @@ func getConfigContainerPorts(logger logr.Logger, conf v1beta1.Config) (map[strin } } - _, metricsPort := conf.Service.MetricsEndpoint(logger) + _, metricsPort, err := conf.Service.MetricsEndpoint(logger) + if err != nil { + logger.Info("couldn't determine metrics port from configuration, using 8888 default value", "error", err) + metricsPort = 8888 + } + ports["metrics"] = corev1.ContainerPort{ Name: "metrics", ContainerPort: metricsPort, diff --git a/internal/manifests/collector/service.go b/internal/manifests/collector/service.go index 8470b9505c..0d2b98eac1 100644 --- a/internal/manifests/collector/service.go +++ b/internal/manifests/collector/service.go @@ -83,7 +83,10 @@ func MonitoringService(params manifests.Params) (*corev1.Service, error) { return nil, err } - _, metricsPort := params.OtelCol.Spec.Config.Service.MetricsEndpoint(params.Log) + _, metricsPort, err := params.OtelCol.Spec.Config.Service.MetricsEndpoint(params.Log) + if err != nil { + return nil, err + } return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ From c525a26f1fa92c4e75cf51f38165452ff918b1ea Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:41:40 +0100 Subject: [PATCH 6/9] Update documentation regarding examination of the collector config file --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6244ab90cf..865f621203 100644 --- a/README.md +++ b/README.md @@ -72,12 +72,14 @@ This will create an OpenTelemetry Collector instance named `simplest`, exposing The `config` node holds the `YAML` that should be passed down as-is to the underlying OpenTelemetry Collector instances. Refer to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) documentation for a reference of the possible entries. -> 🚨 **NOTE:** At this point, the Operator does _not_ validate the contents of the configuration file: if the configuration is invalid, the instance will still be created but the underlying OpenTelemetry Collector might crash. - > 🚨 **Note:** For private GKE clusters, you will need to either add a firewall rule that allows master nodes access to port `9443/tcp` on worker nodes, or change the existing rule that allows access to port `80/tcp`, `443/tcp` and `10254/tcp` to also allow access to port `9443/tcp`. More information can be found in the [Official GCP Documentation](https://cloud.google.com/load-balancing/docs/tcp/setting-up-tcp#config-hc-firewall). See the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#add_firewall_rules) on adding rules and the [Kubernetes issue](https://github.com/kubernetes/kubernetes/issues/79739) for more detail. -The Operator does examine the configuration file to discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. +The Operator does examine the configuration file for a few purposes: + +- To discover configured receivers, services and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. To ignore service creation set `observability.metrics.enableMetric` to `false`. You can create the Service objects yourself if you need them. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. +- To check if Collector observability is enabled (controlled by `spec.observability.metrics.enableMetrics`). In this case, a Service and ServiceMonitor/PodMonitor are created for the Collector instance. As a consequence, if the metrics service address contains an invalid port or uses environment variable expansion for the port, an error will be returned. A workaround for the environment variable case is to set `enableMetrics` to `false` and manually create the previously mentioned objects with the correct port if you need them. + ### Upgrades As noted above, the OpenTelemetry Collector format is continuing to evolve. However, a best-effort attempt is made to upgrade all managed `OpenTelemetryCollector` resources. From 62e7f1e2151e095f855831e514ec69314fce3f61 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Wed, 11 Dec 2024 17:07:47 +0100 Subject: [PATCH 7/9] Fix documentation regarding configured receivers and their ports --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 865f621203..b21350aa56 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ The `config` node holds the `YAML` that should be passed down as-is to the under The Operator does examine the configuration file for a few purposes: -- To discover configured receivers, services and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. To ignore service creation set `observability.metrics.enableMetric` to `false`. You can create the Service objects yourself if you need them. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. +- To discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. To ignore service creation set `observability.metrics.enableMetric` to `false`. You can create the Service objects yourself if you need them. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. - To check if Collector observability is enabled (controlled by `spec.observability.metrics.enableMetrics`). In this case, a Service and ServiceMonitor/PodMonitor are created for the Collector instance. As a consequence, if the metrics service address contains an invalid port or uses environment variable expansion for the port, an error will be returned. A workaround for the environment variable case is to set `enableMetrics` to `false` and manually create the previously mentioned objects with the correct port if you need them. From a4ef4f05a4b967f605ea056c3ebcd878c6c3610a Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Wed, 11 Dec 2024 19:37:21 +0100 Subject: [PATCH 8/9] Remove unrelated/confusion doc line --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b21350aa56..7ca13f1061 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,7 @@ The `config` node holds the `YAML` that should be passed down as-is to the under The Operator does examine the configuration file for a few purposes: -- To discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. To ignore service creation set `observability.metrics.enableMetric` to `false`. You can create the Service objects yourself if you need them. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. +- To discover configured receivers and their ports. If it finds receivers with ports, it creates a pair of kubernetes services, one headless, exposing those ports within the cluster. If the port is using environment variable expansion or cannot be parsed, an error will be returned. The headless service contains a `service.beta.openshift.io/serving-cert-secret-name` annotation that will cause OpenShift to create a secret containing a certificate and key. This secret can be mounted as a volume and the certificate and key used in those receivers' TLS configurations. - To check if Collector observability is enabled (controlled by `spec.observability.metrics.enableMetrics`). In this case, a Service and ServiceMonitor/PodMonitor are created for the Collector instance. As a consequence, if the metrics service address contains an invalid port or uses environment variable expansion for the port, an error will be returned. A workaround for the environment variable case is to set `enableMetrics` to `false` and manually create the previously mentioned objects with the correct port if you need them. From a6744fd4e3501f1ba105bd10cbe80f8d28aa939b Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Thu, 12 Dec 2024 17:22:47 +0100 Subject: [PATCH 9/9] Handle review feedback --- README.md | 2 ++ apis/v1beta1/config.go | 15 ++++++++++----- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 7ca13f1061..f82c55dd89 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,8 @@ This will create an OpenTelemetry Collector instance named `simplest`, exposing The `config` node holds the `YAML` that should be passed down as-is to the underlying OpenTelemetry Collector instances. Refer to the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector) documentation for a reference of the possible entries. +> 🚨 **NOTE:** At this point, the Operator does _not_ validate the whole contents of the configuration file: if the configuration is invalid, the instance might still be created but the underlying OpenTelemetry Collector might crash. + > 🚨 **Note:** For private GKE clusters, you will need to either add a firewall rule that allows master nodes access to port `9443/tcp` on worker nodes, or change the existing rule that allows access to port `80/tcp`, `443/tcp` and `10254/tcp` to also allow access to port `9443/tcp`. More information can be found in the [Official GCP Documentation](https://cloud.google.com/load-balancing/docs/tcp/setting-up-tcp#config-hc-firewall). See the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/private-clusters#add_firewall_rules) on adding rules and the [Kubernetes issue](https://github.com/kubernetes/kubernetes/issues/79739) for more detail. The Operator does examine the configuration file for a few purposes: diff --git a/apis/v1beta1/config.go b/apis/v1beta1/config.go index 4cec32ac5d..7761433702 100644 --- a/apis/v1beta1/config.go +++ b/apis/v1beta1/config.go @@ -434,17 +434,20 @@ const ( // MetricsEndpoint attempts gets the host and port number from the host address without doing any validation regarding the // address itself. -// It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon +// It works even before env var expansion happens, when a simple `net.SplitHostPort` would fail because of the extra colon // from the env var, i.e. the address looks like "${env:POD_IP}:4317", "${env:POD_IP}", or "${POD_IP}". -// In cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}", this returns an error. -// It should work with IPv4 and IPv6 addresses. +// In cases which the port itself is a variable, i.e. "${env:POD_IP}:${env:PORT}", this returns an error. This happens +// because the port is used to generate Service objects and mappings. func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32, error) { telemetry := s.GetTelemetry() if telemetry == nil || telemetry.Metrics.Address == "" { return defaultServiceHost, defaultServicePort, nil } - isPortEnvVar := regexp.MustCompile(`:\${[env:]?.*}$`).MatchString(telemetry.Metrics.Address) + // The regex below matches on strings that end with a colon followed by the environment variable expansion syntax. + // So it should match on strings ending with: ":${env:POD_IP}" or ":${POD_IP}". + const portEnvVarRegex = `:\${[env:]?.*}$` + isPortEnvVar := regexp.MustCompile(portEnvVarRegex).MatchString(telemetry.Metrics.Address) if isPortEnvVar { errMsg := fmt.Sprintf("couldn't determine metrics port from configuration: %s", telemetry.Metrics.Address) @@ -452,7 +455,9 @@ func (s *Service) MetricsEndpoint(logger logr.Logger) (string, int32, error) { return "", 0, fmt.Errorf(errMsg) } - explicitPortMatches := regexp.MustCompile(`:(\d+$)`).FindStringSubmatch(telemetry.Metrics.Address) + // The regex below matches on strings that end with a colon followed by 1 or more numbers (representing the port). + const explicitPortRegex = `:(\d+$)` + explicitPortMatches := regexp.MustCompile(explicitPortRegex).FindStringSubmatch(telemetry.Metrics.Address) if len(explicitPortMatches) <= 1 { return telemetry.Metrics.Address, defaultServicePort, nil }