Skip to content

Commit

Permalink
[service] Move TracerProvider initialization to service/telemetry pac…
Browse files Browse the repository at this point in the history
…kage
  • Loading branch information
mx-psi committed Aug 3, 2023
1 parent 3089ea8 commit 7ea601a
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 265 deletions.
53 changes: 0 additions & 53 deletions service/internal/proctelemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"

"go.opentelemetry.io/collector/obsreport"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
Expand Down Expand Up @@ -62,7 +60,6 @@ var (
}

errNoValidMetricExporter = errors.New("no valid metric exporter")
errNoValidSpanExporter = errors.New("no valid span exporter")
)

func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncErrorChannel chan error) (sdkmetric.Reader, *http.Server, error) {
Expand All @@ -83,56 +80,6 @@ func InitMetricReader(ctx context.Context, reader telemetry.MetricReader, asyncE
return nil, nil, fmt.Errorf("unsupported metric reader type %v", reader)
}

func InitSpanProcessor(_ context.Context, processor telemetry.SpanProcessor) (sdktrace.SpanProcessor, error) {
if processor.Batch != nil {
if processor.Batch.Exporter.Console != nil {
exp, err := stdouttrace.New(
stdouttrace.WithPrettyPrint(),
)
if err != nil {
return nil, err
}
opts := []sdktrace.BatchSpanProcessorOption{}
if processor.Batch.ExportTimeout != nil {
if *processor.Batch.ExportTimeout < 0 {
return nil, fmt.Errorf("invalid export timeout %d", *processor.Batch.ExportTimeout)
}
opts = append(opts, sdktrace.WithExportTimeout(time.Millisecond*time.Duration(*processor.Batch.ExportTimeout)))
}
if processor.Batch.MaxExportBatchSize != nil {
if *processor.Batch.MaxExportBatchSize < 0 {
return nil, fmt.Errorf("invalid batch size %d", *processor.Batch.MaxExportBatchSize)
}
opts = append(opts, sdktrace.WithMaxExportBatchSize(*processor.Batch.MaxExportBatchSize))
}
if processor.Batch.MaxQueueSize != nil {
if *processor.Batch.MaxQueueSize < 0 {
return nil, fmt.Errorf("invalid queue size %d", *processor.Batch.MaxQueueSize)
}
opts = append(opts, sdktrace.WithMaxQueueSize(*processor.Batch.MaxQueueSize))
}
if processor.Batch.ScheduleDelay != nil {
if *processor.Batch.ScheduleDelay < 0 {
return nil, fmt.Errorf("invalid schedule delay %d", *processor.Batch.ScheduleDelay)
}
opts = append(opts, sdktrace.WithBatchTimeout(time.Millisecond*time.Duration(*processor.Batch.ScheduleDelay)))
}
return sdktrace.NewBatchSpanProcessor(exp, opts...), nil
}
return nil, errNoValidSpanExporter
}
return nil, fmt.Errorf("unsupported span processor type %v", processor)
}

func InitTracerProvider(res *resource.Resource, options []sdktrace.TracerProviderOption) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{
sdktrace.WithResource(res),
}

opts = append(opts, options...)
return sdktrace.NewTracerProvider(opts...), nil
}

func InitOpenTelemetry(res *resource.Resource, options []sdkmetric.Option, disableHighCardinality bool) (*sdkmetric.MeterProvider, error) {
opts := []sdkmetric.Option{
sdkmetric.WithResource(res),
Expand Down
91 changes: 0 additions & 91 deletions service/internal/proctelemetry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,94 +344,3 @@ func TestMetricReader(t *testing.T) {
})
}
}

func TestSpanProcessor(t *testing.T) {
testCases := []struct {
name string
processor telemetry.SpanProcessor
args any
err error
}{
{
name: "no processor",
err: errors.New("unsupported span processor type {<nil> <nil>}"),
},
{
name: "batch processor invalid exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
Exporter: telemetry.SpanExporter{},
},
},
err: errNoValidSpanExporter,
},
{
name: "batch processor invalid batch size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(-1),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid batch size -1"),
},
{
name: "batch processor invalid export timeout console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ExportTimeout: intPtr(-2),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid export timeout -2"),
},
{
name: "batch processor invalid queue size console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxQueueSize: intPtr(-3),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid queue size -3"),
},
{
name: "batch processor invalid schedule delay console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
ScheduleDelay: intPtr(-4),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
err: errors.New("invalid schedule delay -4"),
},
{
name: "batch processor console exporter",
processor: telemetry.SpanProcessor{
Batch: &telemetry.BatchSpanProcessor{
MaxExportBatchSize: intPtr(0),
ExportTimeout: intPtr(0),
MaxQueueSize: intPtr(0),
ScheduleDelay: intPtr(0),
Exporter: telemetry.SpanExporter{
Console: telemetry.Console{},
},
},
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
_, err := InitSpanProcessor(context.Background(), tt.processor)
assert.Equal(t, tt.err, err)
})
}
}
44 changes: 44 additions & 0 deletions service/internal/resource/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package resource // import "go.opentelemetry.io/collector/service/internal/resource"

import (
"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"

"go.opentelemetry.io/collector/component"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
)

// New resource from telemetry configuration.
func New(buildInfo component.BuildInfo, resourceCfg map[string]*string) *resource.Resource {
var telAttrs []attribute.KeyValue

for k, v := range resourceCfg {
// nil value indicates that the attribute should not be included in the telemetry.
if v != nil {
telAttrs = append(telAttrs, attribute.String(k, *v))
}
}

if _, ok := resourceCfg[semconv.AttributeServiceName]; !ok {
// AttributeServiceName is not specified in the config. Use the default service name.
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command))
}

if _, ok := resourceCfg[semconv.AttributeServiceInstanceID]; !ok {
// AttributeServiceInstanceID is not specified in the config. Auto-generate one.
instanceUUID, _ := uuid.NewRandom()
instanceID := instanceUUID.String()
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID))
}

if _, ok := resourceCfg[semconv.AttributeServiceVersion]; !ok {
// AttributeServiceVersion is not specified in the config. Use the actual
// build version.
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version))
}
return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...)
}
56 changes: 11 additions & 45 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ import (
"fmt"
"runtime"

"github.com/google/uuid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/sdk/resource"
sdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -24,10 +21,10 @@ import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/receiver"
semconv "go.opentelemetry.io/collector/semconv/v1.18.0"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal/graph"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/resource"
"go.opentelemetry.io/collector/service/telemetry"
)

Expand Down Expand Up @@ -95,25 +92,24 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
res := buildResource(set.BuildInfo, cfg.Telemetry)
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

logger := srv.telemetry.Logger()
if err = srv.telemetryInitializer.initMetrics(res, logger, cfg.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

srv.telemetrySettings = component.TelemetrySettings{
Logger: srv.telemetry.Logger(),
Logger: logger,
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: noop.NewMeterProvider(),
MeterProvider: srv.telemetryInitializer.mp,
MetricsLevel: cfg.Telemetry.Metrics.Level,

// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
}

if err = srv.telemetryInitializer.init(res, srv.telemetrySettings, cfg.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}
srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp
srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp

// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
// If pipeline initialization fails then shut down the telemetry server
Expand Down Expand Up @@ -231,37 +227,7 @@ func getBallastSize(host component.Host) uint64 {
return 0
}

func buildResource(buildInfo component.BuildInfo, cfg telemetry.Config) *resource.Resource {
var telAttrs []attribute.KeyValue

for k, v := range cfg.Resource {
// nil value indicates that the attribute should not be included in the telemetry.
if v != nil {
telAttrs = append(telAttrs, attribute.String(k, *v))
}
}

if _, ok := cfg.Resource[semconv.AttributeServiceName]; !ok {
// AttributeServiceName is not specified in the config. Use the default service name.
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceName, buildInfo.Command))
}

if _, ok := cfg.Resource[semconv.AttributeServiceInstanceID]; !ok {
// AttributeServiceInstanceID is not specified in the config. Auto-generate one.
instanceUUID, _ := uuid.NewRandom()
instanceID := instanceUUID.String()
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceInstanceID, instanceID))
}

if _, ok := cfg.Resource[semconv.AttributeServiceVersion]; !ok {
// AttributeServiceVersion is not specified in the config. Use the actual
// build version.
telAttrs = append(telAttrs, attribute.String(semconv.AttributeServiceVersion, buildInfo.Version))
}
return resource.NewWithAttributes(semconv.SchemaURL, telAttrs...)
}

func pdataFromSdk(res *resource.Resource) pcommon.Resource {
func pdataFromSdk(res *sdkresource.Resource) pcommon.Resource {
// pcommon.NewResource is the best way to generate a new resource currently and is safe to use outside of tests.
// Because the resource is signal agnostic, and we need a net new resource, not an existing one, this is the only
// method of creating it without exposing internal packages.
Expand Down
Loading

0 comments on commit 7ea601a

Please sign in to comment.