From 7ea601aa76b9f59d05954bc9e9e0a399cbc7995b Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 3 Aug 2023 17:05:29 +0200 Subject: [PATCH] [service] Move TracerProvider initialization to service/telemetry package --- service/internal/proctelemetry/config.go | 53 --------- service/internal/proctelemetry/config_test.go | 91 --------------- service/internal/resource/config.go | 44 +++++++ service/service.go | 56 ++------- service/telemetry.go | 65 +---------- service/telemetry/span_processor.go | 57 ++++++++++ service/telemetry/span_processor_test.go | 107 ++++++++++++++++++ service/telemetry/telemetry.go | 13 ++- service/telemetry/tracer_provider.go | 65 +++++++++++ service/telemetry_test.go | 16 +-- 10 files changed, 302 insertions(+), 265 deletions(-) create mode 100644 service/internal/resource/config.go create mode 100644 service/telemetry/span_processor.go create mode 100644 service/telemetry/span_processor_test.go create mode 100644 service/telemetry/tracer_provider.go diff --git a/service/internal/proctelemetry/config.go b/service/internal/proctelemetry/config.go index ee20660e296b..0612031a95f6 100644 --- a/service/internal/proctelemetry/config.go +++ b/service/internal/proctelemetry/config.go @@ -22,12 +22,10 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" otelprom "go.opentelemetry.io/otel/exporters/prometheus" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/collector/obsreport" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" @@ -62,7 +60,6 @@ var ( } errNoValidMetricExporter = errors.New("no valid metric exporter") - errNoValidSpanExporter = errors.New("no valid span exporter") ) func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) { @@ -83,56 +80,6 @@ func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncE return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader) } -func InitSpanProcessor(_ context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) { - if processor.Batch != nil { - if processor.Batch.Exporter.Console != nil { - exp, err := stdouttrace.New( - stdouttrace.WithPrettyPrint(), - ) - if err != nil { - return nil, err - } - opts := []sdktrace.BatchSpanProcessorOption{} - if processor.Batch.ExportTimeout != nil { - if *processor.Batch.ExportTimeout < 0 { - return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout) - } - opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout))) - } - if processor.Batch.MaxExportBatchSize != nil { - if *processor.Batch.MaxExportBatchSize < 0 { - return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize) - } - opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize)) - } - if processor.Batch.MaxQueueSize != nil { - if *processor.Batch.MaxQueueSize < 0 { - return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize) - } - opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize)) - } - if processor.Batch.ScheduleDelay != nil { - if *processor.Batch.ScheduleDelay < 0 { - return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay) - } - opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay))) - } - return sdktrace.NewBatchSpanProcessor(exp, opts...), nil - } - return nil, errNoValidSpanExporter - } - return nil, fmt.Errorf("unsupported span processor type %v", processor) -} - -func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) { - opts := []sdktrace.TracerProviderOption{ - sdktrace.WithResource(res), - } - - opts = append(opts, options...) - return sdktrace.NewTracerProvider(opts...), nil -} - func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) { opts := []sdkmetric.Option{ sdkmetric.WithResource(res), diff --git a/service/internal/proctelemetry/config_test.go b/service/internal/proctelemetry/config_test.go index 1a1fec4e4b6d..3009b130457a 100644 --- a/service/internal/proctelemetry/config_test.go +++ b/service/internal/proctelemetry/config_test.go @@ -344,94 +344,3 @@ func TestMetricReader(t *testing.T) { }) } } - -func TestSpanProcessor(t *testing.T) { - testCases := []struct { - name string - processor telemetry.SpanProcessor - args any - err error - }{ - { - name: "no processor", - err: errors.New("unsupported span processor type { }"), - }, - { - name: "batch processor invalid exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - Exporter: telemetry.SpanExporter{}, - }, - }, - err: errNoValidSpanExporter, - }, - { - name: "batch processor invalid batch size console exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - MaxExportBatchSize: intPtr(-1), - Exporter: telemetry.SpanExporter{ - Console: telemetry.Console{}, - }, - }, - }, - err: errors.New("invalid batch size -1"), - }, - { - name: "batch processor invalid export timeout console exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - ExportTimeout: intPtr(-2), - Exporter: telemetry.SpanExporter{ - Console: telemetry.Console{}, - }, - }, - }, - err: errors.New("invalid export timeout -2"), - }, - { - name: "batch processor invalid queue size console exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - MaxQueueSize: intPtr(-3), - Exporter: telemetry.SpanExporter{ - Console: telemetry.Console{}, - }, - }, - }, - err: errors.New("invalid queue size -3"), - }, - { - name: "batch processor invalid schedule delay console exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - ScheduleDelay: intPtr(-4), - Exporter: telemetry.SpanExporter{ - Console: telemetry.Console{}, - }, - }, - }, - err: errors.New("invalid schedule delay -4"), - }, - { - name: "batch processor console exporter", - processor: telemetry.SpanProcessor{ - Batch: &telemetry.BatchSpanProcessor{ - MaxExportBatchSize: intPtr(0), - ExportTimeout: intPtr(0), - MaxQueueSize: intPtr(0), - ScheduleDelay: intPtr(0), - Exporter: telemetry.SpanExporter{ - Console: telemetry.Console{}, - }, - }, - }, - }, - } - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - _, err := InitSpanProcessor(context.Background(), tt.processor) - assert.Equal(t, tt.err, err) - }) - } -} diff --git a/service/internal/resource/config.go b/service/internal/resource/config.go new file mode 100644 index 000000000000..1f233ddde978 --- /dev/null +++ b/service/internal/resource/config.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package resource // import "go.opentelemetry.io/collector/service/internal/resource" + +import ( + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/resource" + + "go.opentelemetry.io/collector/component" + semconv "go.opentelemetry.io/collector/semconv/v1.18.0" +) + +// New resource from telemetry configuration. +func New(buildInfo component.BuildInfo, resourceCfg map[string]*string) *resource.Resource { + var telAttrs []attribute.KeyValue + + for k, v := range resourceCfg { + // nil value indicates that the attribute should not be included in the telemetry. + if v != nil { + telAttrs = append(telAttrs, attribute.String(k, *v)) + } + } + + if _, ok := resourceCfg[semconv.AttributeServiceName]; !ok { + // AttributeServiceName is not specified in the config. Use the default service name. + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command)) + } + + if _, ok := resourceCfg[semconv.AttributeServiceInstanceID]; !ok { + // AttributeServiceInstanceID is not specified in the config. Auto-generate one. + instanceUUID, _ := uuid.NewRandom() + instanceID := instanceUUID.String() + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID)) + } + + if _, ok := resourceCfg[semconv.AttributeServiceVersion]; !ok { + // AttributeServiceVersion is not specified in the config. Use the actual + // build version. + telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version)) + } + return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...) +} diff --git a/service/service.go b/service/service.go index 318f99a868c6..980a807aa593 100644 --- a/service/service.go +++ b/service/service.go @@ -8,10 +8,7 @@ import ( "fmt" "runtime" - "github.com/google/uuid" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/sdk/resource" + sdkresource "go.opentelemetry.io/otel/sdk/resource" "go.uber.org/multierr" "go.uber.org/zap" @@ -24,10 +21,10 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" - semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/telemetry" ) @@ -95,25 +92,24 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } - res := buildResource(set.BuildInfo, cfg.Telemetry) + res := resource.New(set.BuildInfo, cfg.Telemetry.Resource) pcommonRes := pdataFromSdk(res) + logger := srv.telemetry.Logger() + if err = srv.telemetryInitializer.initMetrics(res, logger, cfg.Telemetry, set.AsyncErrorChannel); err != nil { + return nil, fmt.Errorf("failed to initialize telemetry: %w", err) + } + srv.telemetrySettings = component.TelemetrySettings{ - Logger: srv.telemetry.Logger(), + Logger: logger, TracerProvider: srv.telemetry.TracerProvider(), - MeterProvider: noop.NewMeterProvider(), + MeterProvider: srv.telemetryInitializer.mp, MetricsLevel: cfg.Telemetry.Metrics.Level, // Construct telemetry attributes from build info and config's resource attributes. Resource: pcommonRes, } - if err = srv.telemetryInitializer.init(res, srv.telemetrySettings, cfg.Telemetry, set.AsyncErrorChannel); err != nil { - return nil, fmt.Errorf("failed to initialize telemetry: %w", err) - } - srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp - srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp - // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil { // If pipeline initialization fails then shut down the telemetry server @@ -231,37 +227,7 @@ func getBallastSize(host component.Host) uint64 { return 0 } -func buildResource(buildInfo component.BuildInfo, cfg telemetry.Config) *resource.Resource { - var telAttrs []attribute.KeyValue - - for k, v := range cfg.Resource { - // nil value indicates that the attribute should not be included in the telemetry. - if v != nil { - telAttrs = append(telAttrs, attribute.String(k, *v)) - } - } - - if _, ok := cfg.Resource[semconv.AttributeServiceName]; !ok { - // AttributeServiceName is not specified in the config. Use the default service name. - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command)) - } - - if _, ok := cfg.Resource[semconv.AttributeServiceInstanceID]; !ok { - // AttributeServiceInstanceID is not specified in the config. Auto-generate one. - instanceUUID, _ := uuid.NewRandom() - instanceID := instanceUUID.String() - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID)) - } - - if _, ok := cfg.Resource[semconv.AttributeServiceVersion]; !ok { - // AttributeServiceVersion is not specified in the config. Use the actual - // build version. - telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version)) - } - return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...) -} - -func pdataFromSdk(res *resource.Resource) pcommon.Resource { +func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource { // pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests. // Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only // method of creating it without exposing internal packages. diff --git a/service/telemetry.go b/service/telemetry.go index 70fe22bc6371..db387a49d956 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -5,7 +5,6 @@ package service // import "go.opentelemetry.io/collector/service" import ( "context" - "errors" "net" "net/http" "strconv" @@ -17,19 +16,14 @@ import ( ocmetric "go.opencensus.io/metric" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats/view" - "go.opentelemetry.io/contrib/propagators/b3" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/propagation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/service/internal/proctelemetry" @@ -39,14 +33,6 @@ import ( const ( zapKeyTelemetryAddress = "address" zapKeyTelemetryLevel = "level" - - // supported trace propagators - traceContextPropagator = "tracecontext" - b3Propagator = "b3" -) - -var ( - errUnsupportedPropagator = errors.New("unsupported trace propagator") ) type telemetryInitializer struct { @@ -71,46 +57,18 @@ func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig b } } -func (tel *telemetryInitializer) init(res *resource.Resource, settings component.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { +func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { - settings.Logger.Info( - "Skipping telemetry setup.", + logger.Info( + "Skipping telemetry metrics setup.", zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address), zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()), ) return nil } - settings.Logger.Info("Setting up own telemetry...") - - if tp, err := tel.initTraces(res, cfg); err == nil { - tel.tp = tp - } else { - return err - } - - if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { - otel.SetTextMapPropagator(tp) - } else { - return err - } - - return tel.initMetrics(res, settings.Logger, cfg, asyncErrorChannel) -} - -func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) { - opts := []sdktrace.TracerProviderOption{} - for _, processor := range cfg.Traces.Processors { - sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor) - if err != nil { - return nil, err - } - opts = append(opts, sdktrace.WithSpanProcessor(sp)) - } - return proctelemetry.InitTracerProvider(res, opts) -} + logger.Info("Setting up metrics telemetry...") -func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { // Initialize the ocRegistry, still used by the process metrics. tel.ocRegistry = ocmetric.NewRegistry() if !tel.useOtel && !tel.extendedConfig { @@ -229,18 +187,3 @@ func sanitizePrometheusKey(str string) string { } return strings.Map(runeFilterMap, str) } - -func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { - var textMapPropagators []propagation.TextMapPropagator - for _, prop := range props { - switch prop { - case traceContextPropagator: - textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) - case b3Propagator: - textMapPropagators = append(textMapPropagators, b3.New()) - default: - return nil, errUnsupportedPropagator - } - } - return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil -} diff --git a/service/telemetry/span_processor.go b/service/telemetry/span_processor.go new file mode 100644 index 000000000000..adb78c8a3ece --- /dev/null +++ b/service/telemetry/span_processor.go @@ -0,0 +1,57 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "context" + "errors" + "fmt" + "time" + + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +var errNoValidSpanExporter = errors.New("no valid span exporter") + +func newSpanProcessor(_ context.Context, processor SpanProcessor) (sdktrace.SpanProcessor, error) { + if processor.Batch != nil { + if processor.Batch.Exporter.Console != nil { + exp, err := stdouttrace.New( + stdouttrace.WithPrettyPrint(), + ) + if err != nil { + return nil, err + } + opts := []sdktrace.BatchSpanProcessorOption{} + if processor.Batch.ExportTimeout != nil { + if *processor.Batch.ExportTimeout < 0 { + return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout) + } + opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout))) + } + if processor.Batch.MaxExportBatchSize != nil { + if *processor.Batch.MaxExportBatchSize < 0 { + return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize) + } + opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize)) + } + if processor.Batch.MaxQueueSize != nil { + if *processor.Batch.MaxQueueSize < 0 { + return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize) + } + opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize)) + } + if processor.Batch.ScheduleDelay != nil { + if *processor.Batch.ScheduleDelay < 0 { + return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay) + } + opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay))) + } + return sdktrace.NewBatchSpanProcessor(exp, opts...), nil + } + return nil, errNoValidSpanExporter + } + return nil, fmt.Errorf("unsupported span processor type %v", processor) +} diff --git a/service/telemetry/span_processor_test.go b/service/telemetry/span_processor_test.go new file mode 100644 index 000000000000..6d3ab46015dd --- /dev/null +++ b/service/telemetry/span_processor_test.go @@ -0,0 +1,107 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func intPtr(i int) *int { + return &i +} + +func TestSpanProcessor(t *testing.T) { + testCases := []struct { + name string + processor SpanProcessor + args any + err error + }{ + { + name: "no processor", + err: errors.New("unsupported span processor type { }"), + }, + { + name: "batch processor invalid exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + Exporter: SpanExporter{}, + }, + }, + err: errNoValidSpanExporter, + }, + { + name: "batch processor invalid batch size console exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + MaxExportBatchSize: intPtr(-1), + Exporter: SpanExporter{ + Console: Console{}, + }, + }, + }, + err: errors.New("invalid batch size -1"), + }, + { + name: "batch processor invalid export timeout console exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + ExportTimeout: intPtr(-2), + Exporter: SpanExporter{ + Console: Console{}, + }, + }, + }, + err: errors.New("invalid export timeout -2"), + }, + { + name: "batch processor invalid queue size console exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + MaxQueueSize: intPtr(-3), + Exporter: SpanExporter{ + Console: Console{}, + }, + }, + }, + err: errors.New("invalid queue size -3"), + }, + { + name: "batch processor invalid schedule delay console exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + ScheduleDelay: intPtr(-4), + Exporter: SpanExporter{ + Console: Console{}, + }, + }, + }, + err: errors.New("invalid schedule delay -4"), + }, + { + name: "batch processor console exporter", + processor: SpanProcessor{ + Batch: &BatchSpanProcessor{ + MaxExportBatchSize: intPtr(0), + ExportTimeout: intPtr(0), + MaxQueueSize: intPtr(0), + ScheduleDelay: intPtr(0), + Exporter: SpanExporter{ + Console: Console{}, + }, + }, + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + _, err := newSpanProcessor(context.Background(), tt.processor) + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go index 00f9db630b0e..2eff4ab9ac47 100644 --- a/service/telemetry/telemetry.go +++ b/service/telemetry/telemetry.go @@ -6,6 +6,7 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "context" + "go.opentelemetry.io/collector/component" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" @@ -35,19 +36,21 @@ func (t *Telemetry) Shutdown(ctx context.Context) error { // Settings holds configuration for building Telemetry. type Settings struct { + BuildInfo component.BuildInfo ZapOptions []zap.Option } // New creates a new Telemetry from Config. -func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) { +func New(ctx context.Context, set Settings, cfg Config) (*Telemetry, error) { logger, err := newLogger(cfg.Logs, set.ZapOptions) if err != nil { return nil, err } - tp := sdktrace.NewTracerProvider( - // needed for supporting the zpages extension - sdktrace.WithSampler(alwaysRecord()), - ) + tp, err := newTracerProvider(ctx, set, cfg) + if err != nil { + return nil, err + } + return &Telemetry{ logger: logger, tracerProvider: tp, diff --git a/service/telemetry/tracer_provider.go b/service/telemetry/tracer_provider.go new file mode 100644 index 000000000000..c56d578c905d --- /dev/null +++ b/service/telemetry/tracer_provider.go @@ -0,0 +1,65 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "context" + "errors" + + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + + "go.opentelemetry.io/collector/service/internal/resource" +) + +const ( + // supported trace propagators + traceContextPropagator = "tracecontext" + b3Propagator = "b3" +) + +var ( + errUnsupportedPropagator = errors.New("unsupported trace propagator") +) + +func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { + var textMapPropagators []propagation.TextMapPropagator + for _, prop := range props { + switch prop { + case traceContextPropagator: + textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) + case b3Propagator: + textMapPropagators = append(textMapPropagators, b3.New()) + default: + return nil, errUnsupportedPropagator + } + } + return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil +} + +func newTracerProvider(ctx context.Context, set Settings, cfg Config) (*sdktrace.TracerProvider, error) { + res := resource.New(set.BuildInfo, cfg.Resource) + opts := []sdktrace.TracerProviderOption{ + sdktrace.WithResource(res), + // needed for supporting the zpages extension + sdktrace.WithSampler(alwaysRecord()), + } + for _, processor := range cfg.Traces.Processors { + sp, err := newSpanProcessor(ctx, processor) + if err != nil { + return nil, err + } + opts = append(opts, sdktrace.WithSpanProcessor(sp)) + } + + if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { + otel.SetTextMapPropagator(tp) + } else { + return nil, err + } + + return sdktrace.NewTracerProvider(opts...), nil +} diff --git a/service/telemetry_test.go b/service/telemetry_test.go index a4144e27890c..daf2cb760be8 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/resource" "go.opentelemetry.io/collector/service/telemetry" ) @@ -40,7 +41,7 @@ func TestBuildResource(t *testing.T) { // Check default config cfg := telemetry.Config{} - otelRes := buildResource(buildInfo, cfg) + otelRes := resource.New(buildInfo, cfg.Resource) res := pdataFromSdk(otelRes) assert.Equal(t, res.Attributes().Len(), 3) @@ -62,7 +63,7 @@ func TestBuildResource(t *testing.T) { semconv.AttributeServiceInstanceID: nil, }, } - otelRes = buildResource(buildInfo, cfg) + otelRes = resource.New(buildInfo, cfg.Resource) res = pdataFromSdk(otelRes) // Attributes should not exist since we nil-ified all. @@ -77,7 +78,7 @@ func TestBuildResource(t *testing.T) { semconv.AttributeServiceInstanceID: strPtr("c"), }, } - otelRes = buildResource(buildInfo, cfg) + otelRes = resource.New(buildInfo, cfg.Resource) res = pdataFromSdk(otelRes) assert.Equal(t, res.Attributes().Len(), 3) @@ -270,13 +271,8 @@ func TestTelemetryInit(t *testing.T) { }, } } - otelRes := buildResource(buildInfo, *tc.cfg) - res := pdataFromSdk(otelRes) - settings := component.TelemetrySettings{ - Logger: zap.NewNop(), - Resource: res, - } - err := tel.init(otelRes, settings, *tc.cfg, make(chan error)) + otelRes := resource.New(buildInfo, tc.cfg.Resource) + err := tel.initMetrics(otelRes, zap.NewNop(), *tc.cfg, make(chan error)) require.NoError(t, err) defer func() { require.NoError(t, tel.shutdown())