From 58c9d2919046cf36299fca9e7c777533dc3710a2 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 19 Jan 2024 17:20:43 +0100 Subject: [PATCH 1/3] [service] Refactor telemetry initialization --- service/internal/resource/config.go | 44 ++++++++++++++++++++++ service/service.go | 57 ++++++----------------------- service/telemetry.go | 9 ++--- service/telemetry_test.go | 17 +++------ 4 files changed, 65 insertions(+), 62 deletions(-) create mode 100644 service/internal/resource/config.go diff --git a/service/internal/resource/config.go b/service/internal/resource/config.go new file mode 100644 index 00000000000..1f233ddde97 --- /dev/null +++ b/service/internal/resource/config.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package resource // import "go.opentelemetry.io/collector/service/internal/resource" + +import ( + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/resource" + + "go.opentelemetry.io/collector/component" + semconv "go.opentelemetry.io/collector/semconv/v1.18.0" +) + +// New resource from telemetry configuration. +func New(buildInfo component.BuildInfo, resourceCfg map[string]*string) *resource.Resource { + var telAttrs []attribute.KeyValue + + for k, v := range resourceCfg { + // nil value indicates that the attribute should not be included in the telemetry. + if v != nil { + telAttrs = append(telAttrs, attribute.String(k, *v)) + } + } + + if _, ok := resourceCfg[semconv.AttributeServiceName]; !ok { + // AttributeServiceName is not specified in the config. Use the default service name. + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command)) + } + + if _, ok := resourceCfg[semconv.AttributeServiceInstanceID]; !ok { + // AttributeServiceInstanceID is not specified in the config. Auto-generate one. + instanceUUID, _ := uuid.NewRandom() + instanceID := instanceUUID.String() + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID)) + } + + if _, ok := resourceCfg[semconv.AttributeServiceVersion]; !ok { + // AttributeServiceVersion is not specified in the config. Use the actual + // build version. + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version)) + } + return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...) +} diff --git a/service/service.go b/service/service.go index 32ed78d465e..cca5e37b3df 100644 --- a/service/service.go +++ b/service/service.go @@ -9,10 +9,7 @@ import ( "fmt" "runtime" - "github.com/google/uuid" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/sdk/resource" + sdkresource "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/multierr" "go.uber.org/zap" @@ -26,10 +23,10 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" @@ -97,13 +94,17 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } - res := buildResource(set.BuildInfo, cfg.Telemetry) + res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) + logger := srv.telemetry.Logger() + if err = srv.telemetryInitializer.init(res, logger, cfg.Telemetry, set.AsyncErrorChannel); err != nil { + return nil, fmt.Errorf("failed to initialize telemetry: %w", err) + } srv.telemetrySettings = servicetelemetry.TelemetrySettings{ - Logger: srv.telemetry.Logger(), - TracerProvider: srv.telemetry.TracerProvider(), - MeterProvider: noop.NewMeterProvider(), + Logger: logger, + TracerProvider: srv.telemetryInitializer.tp, + MeterProvider: srv.telemetryInitializer.mp, MetricsLevel: cfg.Telemetry.Metrics.Level, // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, @@ -115,12 +116,6 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { }), } - if err = srv.telemetryInitializer.init(res, srv.telemetrySettings, cfg.Telemetry, set.AsyncErrorChannel); err != nil { - return nil, fmt.Errorf("failed to initialize telemetry: %w", err) - } - srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp - srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp - // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil { // If pipeline initialization fails then shut down the telemetry server @@ -257,37 +252,7 @@ func getBallastSize(host component.Host) uint64 { return 0 } -func buildResource(buildInfo component.BuildInfo, cfg telemetry.Config) *resource.Resource { - var telAttrs []attribute.KeyValue - - for k, v := range cfg.Resource { - // nil value indicates that the attribute should not be included in the telemetry. - if v != nil { - telAttrs = append(telAttrs, attribute.String(k, *v)) - } - } - - if _, ok := cfg.Resource[semconv.AttributeServiceName]; !ok { - // AttributeServiceName is not specified in the config. Use the default service name. - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command)) - } - - if _, ok := cfg.Resource[semconv.AttributeServiceInstanceID]; !ok { - // AttributeServiceInstanceID is not specified in the config. Auto-generate one. - instanceUUID, _ := uuid.NewRandom() - instanceID := instanceUUID.String() - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID)) - } - - if _, ok := cfg.Resource[semconv.AttributeServiceVersion]; !ok { - // AttributeServiceVersion is not specified in the config. Use the actual - // build version. - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version)) - } - return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...) -} - -func pdataFromSdk(res *resource.Resource) pcommon.Resource { +func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only // method of creating it without exposing internal packages. diff --git a/service/telemetry.go b/service/telemetry.go index de2837b10bc..bbf10f45f55 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/telemetry" ) @@ -64,9 +63,9 @@ func newColTelemetry(disableHighCardinality bool, extendedConfig bool) *telemetr } } -func (tel *telemetryInitializer) init(res *resource.Resource, settings servicetelemetry.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { +func (tel *telemetryInitializer) init(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { - settings.Logger.Info( + logger.Info( "Skipping telemetry setup.", zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address), zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()), @@ -74,7 +73,7 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings servicete return nil } - settings.Logger.Info("Setting up own telemetry...") + logger.Info("Setting up own telemetry...") if tp, err := tel.initTraces(res, cfg); err == nil { tel.tp = tp @@ -88,7 +87,7 @@ func (tel *telemetryInitializer) init(res *resource.Resource, settings servicete return err } - return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel) + return tel.initMetrics(res, logger, cfg, asyncErrorChannel) } func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) { diff --git a/service/telemetry_test.go b/service/telemetry_test.go index afd3766fe41..5f0163dadea 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -24,7 +24,7 @@ import ( "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" - "go.opentelemetry.io/collector/service/internal/servicetelemetry" + "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/telemetry" ) @@ -42,7 +42,7 @@ func TestBuildResource(t *testing.T) { // Check default config cfg := telemetry.Config{} - otelRes := buildResource(buildInfo, cfg) + otelRes := resource.New(buildInfo, cfg.Resource) res := pdataFromSdk(otelRes) assert.Equal(t, res.Attributes().Len(), 3) @@ -64,7 +64,7 @@ func TestBuildResource(t *testing.T) { semconv.AttributeServiceInstanceID: nil, }, } - otelRes = buildResource(buildInfo, cfg) + otelRes = resource.New(buildInfo, cfg.Resource) res = pdataFromSdk(otelRes) // Attributes should not exist since we nil-ified all. @@ -79,7 +79,7 @@ func TestBuildResource(t *testing.T) { semconv.AttributeServiceInstanceID: strPtr("c"), }, } - otelRes = buildResource(buildInfo, cfg) + otelRes = resource.New(buildInfo, cfg.Resource) res = pdataFromSdk(otelRes) assert.Equal(t, res.Attributes().Len(), 3) @@ -299,13 +299,8 @@ func TestTelemetryInit(t *testing.T) { }, } } - otelRes := buildResource(buildInfo, *tc.cfg) - res := pdataFromSdk(otelRes) - settings := servicetelemetry.TelemetrySettings{ - Logger: zap.NewNop(), - Resource: res, - } - err := tel.init(otelRes, settings, *tc.cfg, make(chan error)) + otelRes := resource.New(buildInfo, tc.cfg.Resource) + err := tel.init(otelRes, zap.NewNop(), *tc.cfg, make(chan error)) require.NoError(t, err) defer func() { require.NoError(t, tel.shutdown()) From 3ec1771e3217b631d5a24559c05addaa9d631e63 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 19 Jan 2024 17:49:49 +0100 Subject: [PATCH 2/3] Add resource test --- service/internal/resource/config_test.go | 102 +++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 service/internal/resource/config_test.go diff --git a/service/internal/resource/config_test.go b/service/internal/resource/config_test.go new file mode 100644 index 00000000000..1c2ca870ef9 --- /dev/null +++ b/service/internal/resource/config_test.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package resource + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" +) + +const ( + randomUUIDSpecialValue = "random-uuid" +) + +var buildInfo = component.BuildInfo{ + Command: "otelcol", + Version: "1.0.0", +} + +func ptr[T any](v T) *T { + return &v +} + +func TestNew(t *testing.T) { + tests := []struct { + name string + resourceCfg map[string]*string + want map[string]string + }{ + { + name: "empty", + resourceCfg: map[string]*string{}, + want: map[string]string{ + "service.name": "otelcol", + "service.version": "1.0.0", + "service.instance.id": randomUUIDSpecialValue, + }, + }, + { + name: "overwrite", + resourceCfg: map[string]*string{ + "service.name": ptr("my-service"), + "service.version": ptr("1.2.3"), + "service.instance.id": ptr("123"), + }, + want: map[string]string{ + "service.name": "my-service", + "service.version": "1.2.3", + "service.instance.id": "123", + }, + }, + { + name: "remove", + resourceCfg: map[string]*string{ + "service.name": nil, + "service.version": nil, + "service.instance.id": nil, + }, + want: map[string]string{}, + }, + { + name: "add", + resourceCfg: map[string]*string{ + "host.name": ptr("my-host"), + }, + want: map[string]string{ + "service.name": "otelcol", + "service.version": "1.0.0", + "service.instance.id": randomUUIDSpecialValue, + "host.name": "my-host", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res := New(buildInfo, tt.resourceCfg) + got := make(map[string]string) + for _, attr := range res.Attributes() { + got[string(attr.Key)] = attr.Value.Emit() + } + + if tt.want["service.instance.id"] == randomUUIDSpecialValue { + assert.Contains(t, got, "service.instance.id") + + // Check that the value is a valid UUID. + _, err := uuid.Parse(got["service.instance.id"]) + assert.NoError(t, err) + + // Remove so that we can compare the rest of the map. + delete(got, "service.instance.id") + delete(tt.want, "service.instance.id") + } + + assert.EqualValues(t, tt.want, got) + }) + } + +} From d00d17de1cadc8092d5ce5ff27d914e49f4fe518 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Fri, 19 Jan 2024 17:57:16 +0100 Subject: [PATCH 3/3] make fmt --- service/internal/resource/config_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/service/internal/resource/config_test.go b/service/internal/resource/config_test.go index 1c2ca870ef9..ab8f8e816c3 100644 --- a/service/internal/resource/config_test.go +++ b/service/internal/resource/config_test.go @@ -8,6 +8,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" )