From 895252165d24e1215d19eba79f0bf699c744972c Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Sat, 30 Nov 2024 03:33:47 +0100 Subject: [PATCH] [receiver/googlecloudpubsub] Add support for encoding extensions (#37109) Added support for encoding extensions. Setting the encoding field in the config now references the extension. If it didn't find the extension it will fall back to searching the internal encoders. To make the build in encoders consistent with the extensions they now have the same interface. The README is adapted accordingly. --- .../pubsubreceiver-encodingextensions.yaml | 11 + receiver/googlecloudpubsubreceiver/README.md | 54 ++-- receiver/googlecloudpubsubreceiver/config.go | 45 ---- .../googlecloudpubsubreceiver/config_test.go | 60 ----- receiver/googlecloudpubsubreceiver/factory.go | 6 +- receiver/googlecloudpubsubreceiver/go.mod | 5 +- receiver/googlecloudpubsubreceiver/go.sum | 8 +- .../internal/log_entry.go | 4 +- .../internal/log_entry_test.go | 7 +- .../googlecloudpubsubreceiver/receiver.go | 248 ++++++++++++++---- .../receiver_test.go | 167 +++++++++++- 11 files changed, 415 insertions(+), 200 deletions(-) create mode 100644 .chloggen/pubsubreceiver-encodingextensions.yaml diff --git a/.chloggen/pubsubreceiver-encodingextensions.yaml b/.chloggen/pubsubreceiver-encodingextensions.yaml new file mode 100644 index 000000000000..efbeca18fe5f --- /dev/null +++ b/.chloggen/pubsubreceiver-encodingextensions.yaml @@ -0,0 +1,11 @@ +change_type: enhancement + +component: googlecloudpubsubreceiver + +note: Added support for encoding extensions. + +issues: [37109] + +subtext: + +change_logs: [user] diff --git a/receiver/googlecloudpubsubreceiver/README.md b/receiver/googlecloudpubsubreceiver/README.md index bb8edb3b4dcd..21b4a419b650 100644 --- a/receiver/googlecloudpubsubreceiver/README.md +++ b/receiver/googlecloudpubsubreceiver/README.md @@ -41,29 +41,53 @@ receivers: ## Encoding -You should not need to set the encoding of the subscription as the receiver will try to discover the type of the data -by looking at the `ce-type` and `content-type` attributes of the message. Only when those attributes are not set -must the `encoding` field in the configuration be set. +The `encoding` options allows you to specify Encoding Extensions for decoding messages on the subscription. An +extension need to be configured in the `extensions` section, and added to pipeline in the collectors configuration file. -| ce-type | ce-datacontenttype | encoding | description | -|-----------------------------------|----------------------|-------------------|------------------------------------------------| -| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | -| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | -| org.opentelemetry.otlp.logs.v1 | application/json | | Decode OTLP log message | -| - | - | otlp_proto_trace | Decode OTLP trace message | -| - | - | otlp_proto_metric | Decode OTLP trace message | -| - | - | otlp_proto_log | Decode OTLP trace message | -| - | - | cloud_logging | Decode [Cloud Logging] [LogEntry] message type | -| - | - | raw_text | Wrap in an OTLP log message | +The following example shows how to use the text encoding extension for ingesting arbitrary text message on a +subscription, wrapping them in OTLP Log messages. Note that not all extensions support all signals. + +```yaml +extensions: + text_encoding: + encoding: utf8 + unmarshaling_separator: "\r?\n" + +service: + extensions: [text_encoding] + pipelines: + logs: + receivers: [googlecloudpubsub] + processors: [] + exporters: [debug] +``` -When the `encoding` configuration is set, the attributes on the message are ignored. +The receiver also supports build in encodings for the native OTLP encodings, without the need to specify an Encoding +Extensions. The non OTLP build in encodings will be deprecated as soon as extensions for the formats are available. + +| encoding | description | +|-------------------|------------------------------------------------| +| otlp_proto_trace | Decode OTLP trace message | +| otlp_proto_metric | Decode OTLP trace message | +| otlp_proto_log | Decode OTLP trace message | +| cloud_logging | Decode [Cloud Logging] [LogEntry] message type | +| raw_text | Wrap in an OTLP log message | With `cloud_logging`, the receiver can be used to bring Cloud Logging messages into an OpenTelemetry pipeline. You'll first need to [set up a logging sink][sink-docs] with a Pub/Sub topic as its destination. Note that the `cloud_logging` integration is considered **alpha** as the semantic convention on some of the conversion are not stabilized yet. With `raw_text`, the receiver can be used for ingesting arbitrary text message on a Pubsub subscription, wrapping them -in OTLP Log messages, making it a convenient way to ingest raw log lines from Pubsub. +in OTLP Log messages. + +When no encoding is specified, the receiver will try to discover the type of the data by looking at the `ce-type` and +`content-type` attributes of the message. These message attributes are set by the `googlepubsubexporter`. + +| ce-type | ce-datacontenttype | encoding | description | +|-----------------------------------|----------------------|-------------------|------------------------------------------------| +| org.opentelemetry.otlp.traces.v1 | application/protobuf | | Decode OTLP trace message | +| org.opentelemetry.otlp.metrics.v1 | application/protobuf | | Decode OTLP metric message | +| org.opentelemetry.otlp.logs.v1 | application/protobuf | | Decode OTLP log message | [Cloud Logging]: https://cloud.google.com/logging [LogEntry]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry diff --git a/receiver/googlecloudpubsubreceiver/config.go b/receiver/googlecloudpubsubreceiver/config.go index 8dbdb8b9a3e7..d167a29dd63b 100644 --- a/receiver/googlecloudpubsubreceiver/config.go +++ b/receiver/googlecloudpubsubreceiver/config.go @@ -35,51 +35,6 @@ type Config struct { ClientID string `mapstructure:"client_id"` } -func (config *Config) validateForLog() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_log": - case "raw_text": - case "raw_json": - case "cloud_logging": - default: - return fmt.Errorf("log encoding %v is not supported. supported encoding formats include [otlp_proto_log,raw_text,raw_json,cloud_logging]", config.Encoding) - } - return nil -} - -func (config *Config) validateForTrace() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_trace": - default: - return fmt.Errorf("trace encoding %v is not supported. supported encoding formats include [otlp_proto_trace]", config.Encoding) - } - return nil -} - -func (config *Config) validateForMetric() error { - err := config.validate() - if err != nil { - return err - } - switch config.Encoding { - case "": - case "otlp_proto_metric": - default: - return fmt.Errorf("metric encoding %v is not supported. supported encoding formats include [otlp_proto_metric]", config.Encoding) - } - return nil -} - func (config *Config) validate() error { if !subscriptionMatcher.MatchString(config.Subscription) { return fmt.Errorf("subscription '%s' is not a valid format, use 'projects//subscriptions/'", config.Subscription) diff --git a/receiver/googlecloudpubsubreceiver/config_test.go b/receiver/googlecloudpubsubreceiver/config_test.go index 7dfb798ab6be..6b86acc14bb4 100644 --- a/receiver/googlecloudpubsubreceiver/config_test.go +++ b/receiver/googlecloudpubsubreceiver/config_test.go @@ -63,9 +63,6 @@ func TestLoadConfig(t *testing.T) { func TestConfigValidation(t *testing.T) { factory := NewFactory() c := factory.CreateDefaultConfig().(*Config) - assert.Error(t, c.validateForTrace()) - assert.Error(t, c.validateForLog()) - assert.Error(t, c.validateForMetric()) c.Subscription = "projects/000project/subscriptions/my-subscription" assert.Error(t, c.validate()) c.Subscription = "projects/my-project/topics/my-topic" @@ -73,60 +70,3 @@ func TestConfigValidation(t *testing.T) { c.Subscription = "projects/my-project/subscriptions/my-subscription" assert.NoError(t, c.validate()) } - -func TestTraceConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForTrace()) - - c.Encoding = "otlp_proto_metric" - assert.Error(t, c.validateForTrace()) - c.Encoding = "otlp_proto_log" - assert.Error(t, c.validateForTrace()) - c.Encoding = "raw_text" - assert.Error(t, c.validateForTrace()) - c.Encoding = "raw_json" - assert.Error(t, c.validateForTrace()) - - c.Encoding = "otlp_proto_trace" - assert.NoError(t, c.validateForTrace()) -} - -func TestMetricConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForMetric()) - - c.Encoding = "otlp_proto_trace" - assert.Error(t, c.validateForMetric()) - c.Encoding = "otlp_proto_log" - assert.Error(t, c.validateForMetric()) - c.Encoding = "raw_text" - assert.Error(t, c.validateForMetric()) - c.Encoding = "raw_json" - assert.Error(t, c.validateForMetric()) - - c.Encoding = "otlp_proto_metric" - assert.NoError(t, c.validateForMetric()) -} - -func TestLogConfigValidation(t *testing.T) { - factory := NewFactory() - c := factory.CreateDefaultConfig().(*Config) - c.Subscription = "projects/my-project/subscriptions/my-subscription" - assert.NoError(t, c.validateForLog()) - - c.Encoding = "otlp_proto_trace" - assert.Error(t, c.validateForLog()) - c.Encoding = "otlp_proto_metric" - assert.Error(t, c.validateForLog()) - - c.Encoding = "raw_text" - assert.NoError(t, c.validateForLog()) - c.Encoding = "raw_json" - assert.NoError(t, c.validateForLog()) - c.Encoding = "otlp_proto_log" - assert.NoError(t, c.validateForLog()) -} diff --git a/receiver/googlecloudpubsubreceiver/factory.go b/receiver/googlecloudpubsubreceiver/factory.go index 96ccc49d5814..802718a55fb4 100644 --- a/receiver/googlecloudpubsubreceiver/factory.go +++ b/receiver/googlecloudpubsubreceiver/factory.go @@ -71,7 +71,7 @@ func (factory *pubsubReceiverFactory) CreateTraces( cfg component.Config, consumer consumer.Traces, ) (receiver.Traces, error) { - err := cfg.(*Config).validateForTrace() + err := cfg.(*Config).validate() if err != nil { return nil, err } @@ -89,7 +89,7 @@ func (factory *pubsubReceiverFactory) CreateMetrics( cfg component.Config, consumer consumer.Metrics, ) (receiver.Metrics, error) { - err := cfg.(*Config).validateForMetric() + err := cfg.(*Config).validate() if err != nil { return nil, err } @@ -107,7 +107,7 @@ func (factory *pubsubReceiverFactory) CreateLogs( cfg component.Config, consumer consumer.Logs, ) (receiver.Logs, error) { - err := cfg.(*Config).validateForLog() + err := cfg.(*Config).validate() if err != nil { return nil, err } diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index a34ff926fc41..1347efebf72f 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -8,6 +8,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/iancoleman/strcase v0.3.0 github.com/json-iterator/go v1.1.12 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.117.0 go.opentelemetry.io/collector/component/componenttest v0.117.0 @@ -36,7 +37,7 @@ require ( cloud.google.com/go/iam v1.2.2 // indirect cloud.google.com/go/longrunning v0.6.2 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -55,7 +56,7 @@ require ( github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect go.einride.tech/aip v0.68.0 // indirect go.opencensus.io v0.24.0 // indirect diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index 5ca8b1668615..0bf3653fb0d4 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -22,8 +22,9 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -100,8 +101,11 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0 h1:I6jpBR2wbwYskCumSef5CGHG5JXIyVzkKZi07ipkHlY= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.114.0/go.mod h1:tY9HNnVFA9+vvEZzKlpji60IffBbVtZKktsMDcWOguc= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry.go b/receiver/googlecloudpubsubreceiver/internal/log_entry.go index 282ed890e6f3..8ebc3f2fd9a3 100644 --- a/receiver/googlecloudpubsubreceiver/internal/log_entry.go +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry.go @@ -5,7 +5,6 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co import ( "bytes" - "context" "encoding/hex" stdjson "encoding/json" "errors" @@ -20,7 +19,6 @@ import ( jsoniter "github.com/json-iterator/go" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" - "go.uber.org/zap" "google.golang.org/genproto/googleapis/api/monitoredres" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/reflect/protoreflect" @@ -113,7 +111,7 @@ func getLogEntryDescriptor() protoreflect.MessageDescriptor { // schema; this ensures that a numeric value in the input is correctly // translated to either an integer or a double in the output. It falls back to // plain JSON decoding if payload type is not available in the proto registry. -func TranslateLogEntry(_ context.Context, _ *zap.Logger, data []byte) (pcommon.Resource, plog.LogRecord, error) { +func TranslateLogEntry(data []byte) (pcommon.Resource, plog.LogRecord, error) { lr := plog.NewLogRecord() res := pcommon.NewResource() diff --git a/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go b/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go index a5b959b06013..5eef975189f0 100644 --- a/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go +++ b/receiver/googlecloudpubsubreceiver/internal/log_entry_test.go @@ -4,7 +4,6 @@ package internal import ( - "context" "fmt" "testing" "time" @@ -15,7 +14,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" - "go.uber.org/zap" ) type Log struct { @@ -70,15 +68,12 @@ func TestTranslateLogEntry(t *testing.T) { }{ // TODO: Add publicly shareable log test data. } - - logger, _ := zap.NewDevelopment() - for _, tt := range tests { var errs error wantRes, wantLr, err := generateLog(t, tt.want) errs = multierr.Append(errs, err) - gotRes, gotLr, err := TranslateLogEntry(context.TODO(), logger, []byte(tt.input)) + gotRes, gotLr, err := TranslateLogEntry([]byte(tt.input)) errs = multierr.Append(errs, err) errs = multierr.Combine(errs, compareResources(wantRes, gotRes), compareLogRecords(wantLr, gotLr)) diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 31b53a3c2851..e589ec6f3a31 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" ) @@ -50,24 +51,39 @@ type pubsubReceiver struct { startOnce sync.Once } -type encoding int +type buildInEncoding int const ( - unknown encoding = iota - otlpProtoTrace = iota - otlpProtoMetric = iota - otlpProtoLog = iota - rawTextLog = iota - cloudLogging = iota + unknown buildInEncoding = iota + otlpProtoTrace = iota + otlpProtoMetric = iota + otlpProtoLog = iota + rawTextLog = iota + cloudLogging = iota ) -type compression int +type buildInCompression int const ( - uncompressed compression = iota - gZip = iota + uncompressed buildInCompression = iota + gZip = iota ) +// consumerCount returns the number of attached consumers, useful for detecting errors in pipelines +func (receiver *pubsubReceiver) consumerCount() int { + count := 0 + if receiver.logsConsumer != nil { + count++ + } + if receiver.metricsConsumer != nil { + count++ + } + if receiver.tracesConsumer != nil { + count++ + } + return count +} + func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOption) { if receiver.userAgent != "" { copts = append(copts, option.WithUserAgent(receiver.userAgent)) @@ -87,11 +103,86 @@ func (receiver *pubsubReceiver) generateClientOptions() (copts []option.ClientOp return copts } -func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) error { +func (receiver *pubsubReceiver) Start(ctx context.Context, host component.Host) error { if receiver.tracesConsumer == nil && receiver.metricsConsumer == nil && receiver.logsConsumer == nil { return errors.New("cannot start receiver: no consumers were specified") } + var createHandlerFn func(context.Context) error + + if receiver.config.Encoding != "" { + if receiver.consumerCount() > 1 { + return errors.New("cannot start receiver: multiple consumers were attached, but encoding was specified") + } + encodingID := convertEncoding(receiver.config.Encoding) + if encodingID == unknown { + extensionID := component.ID{} + err := extensionID.UnmarshalText([]byte(receiver.config.Encoding)) + if err != nil { + return errors.New("cannot start receiver: neither a build in encoder, or an extension") + } + extensions := host.GetExtensions() + if extension, ok := extensions[extensionID]; ok { + if receiver.tracesConsumer != nil { + receiver.tracesUnmarshaler, ok = extension.(encoding.TracesUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a trace unmarshaler", extensionID) + } + } + if receiver.logsConsumer != nil { + receiver.logsUnmarshaler, ok = extension.(encoding.LogsUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a logs unmarshaler", extensionID) + } + } + if receiver.metricsConsumer != nil { + receiver.metricsUnmarshaler, ok = extension.(encoding.MetricsUnmarshalerExtension) + if !ok { + return fmt.Errorf("cannot start receiver: extension %q is not a metrics unmarshaler", extensionID) + } + } + } else { + return fmt.Errorf("cannot start receiver: extension %q not found", extensionID) + } + } else { + if receiver.tracesConsumer != nil { + switch encodingID { + case otlpProtoTrace: + receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for traces", receiver.config.Encoding) + } + } + if receiver.logsConsumer != nil { + switch encodingID { + case otlpProtoLog: + receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} + case rawTextLog: + receiver.logsUnmarshaler = unmarshalLogStrings{} + case cloudLogging: + receiver.logsUnmarshaler = unmarshalCloudLoggingLogEntry{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for logs", receiver.config.Encoding) + } + } + if receiver.metricsConsumer != nil { + switch encodingID { + case otlpProtoMetric: + receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} + default: + return fmt.Errorf("cannot start receiver: build in encoding %s is not supported for metrics", receiver.config.Encoding) + } + } + } + createHandlerFn = receiver.createReceiverHandler + } else { + // we will rely on the attributes of the message to determine the signal, so we need all proto unmarshalers + receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} + receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} + receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} + createHandlerFn = receiver.createMultiplexingReceiverHandler + } + var startErr error receiver.startOnce.Do(func() { copts := receiver.generateClientOptions() @@ -102,15 +193,12 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, _ component.Host) err } receiver.client = client - err = receiver.createReceiverHandler(ctx) + err = createHandlerFn(ctx) if err != nil { startErr = fmt.Errorf("failed to create ReceiverHandler: %w", err) return } }) - receiver.tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} - receiver.metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} - receiver.logsUnmarshaler = &plog.ProtoUnmarshaler{} return startErr } @@ -132,13 +220,9 @@ func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { return err } -func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *pubsubpb.ReceivedMessage) error { - if receiver.logsConsumer == nil { - return nil - } - data := string(message.Message.Data) - timestamp := message.GetMessage().PublishTime +type unmarshalLogStrings struct{} +func (unmarshalLogStrings) UnmarshalLogs(data []byte) (plog.Logs, error) { out := plog.NewLogs() logs := out.ResourceLogs() rls := logs.AppendEmpty() @@ -146,22 +230,34 @@ func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, message *p ills := rls.ScopeLogs().AppendEmpty() lr := ills.LogRecords().AppendEmpty() - lr.Body().SetStr(data) - lr.SetTimestamp(pcommon.NewTimestampFromTime(timestamp.AsTime())) + lr.Body().SetStr(string(data)) + return out, nil +} + +func (receiver *pubsubReceiver) handleLogStrings(ctx context.Context, payload []byte) error { + if receiver.logsConsumer == nil { + return nil + } + unmarshall := unmarshalLogStrings{} + out, err := unmarshall.UnmarshalLogs(payload) + if err != nil { + return err + } return receiver.logsConsumer.ConsumeLogs(ctx, out) } -func (receiver *pubsubReceiver) handleCloudLoggingLogEntry(ctx context.Context, message *pubsubpb.ReceivedMessage) error { - resource, lr, err := internal.TranslateLogEntry(ctx, receiver.logger, message.Message.Data) +type unmarshalCloudLoggingLogEntry struct{} + +func (unmarshalCloudLoggingLogEntry) UnmarshalLogs(data []byte) (plog.Logs, error) { + resource, lr, err := internal.TranslateLogEntry(data) + out := plog.NewLogs() lr.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) if err != nil { - receiver.logger.Error("got an error", zap.Error(err)) - return err + return out, err } - out := plog.NewLogs() logs := out.ResourceLogs() rls := logs.AppendEmpty() resource.CopyTo(rls.Resource()) @@ -169,10 +265,10 @@ func (receiver *pubsubReceiver) handleCloudLoggingLogEntry(ctx context.Context, ills := rls.ScopeLogs().AppendEmpty() lr.CopyTo(ills.LogRecords().AppendEmpty()) - return receiver.logsConsumer.ConsumeLogs(ctx, out) + return out, nil } -func decompress(payload []byte, compression compression) ([]byte, error) { +func decompress(payload []byte, compression buildInCompression) ([]byte, error) { if compression == gZip { reader, err := gzip.NewReader(bytes.NewReader(payload)) if err != nil { @@ -183,7 +279,7 @@ func decompress(payload []byte, compression compression) ([]byte, error) { return payload, nil } -func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -199,7 +295,7 @@ func (receiver *pubsubReceiver) handleTrace(ctx context.Context, payload []byte, return nil } -func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -215,7 +311,7 @@ func (receiver *pubsubReceiver) handleMetric(ctx context.Context, payload []byte return nil } -func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression compression) error { +func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, compression buildInCompression) error { payload, err := decompress(payload, compression) if err != nil { return err @@ -231,9 +327,9 @@ func (receiver *pubsubReceiver) handleLog(ctx context.Context, payload []byte, c return nil } -func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (encoding, compression) { - otlpEncoding := unknown - otlpCompression := uncompressed +func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (otlpEncoding buildInEncoding, otlpCompression buildInCompression) { + otlpEncoding = unknown + otlpCompression = uncompressed ceType := attributes["ce-type"] ceContentType := attributes["content-type"] @@ -251,18 +347,7 @@ func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (en } if otlpEncoding == unknown && receiver.config.Encoding != "" { - switch receiver.config.Encoding { - case "otlp_proto_trace": - otlpEncoding = otlpProtoTrace - case "otlp_proto_metric": - otlpEncoding = otlpProtoMetric - case "otlp_proto_log": - otlpEncoding = otlpProtoLog - case "cloud_logging": - otlpEncoding = cloudLogging - case "raw_text": - otlpEncoding = rawTextLog - } + otlpEncoding = convertEncoding(receiver.config.Encoding) } ceContentEncoding := attributes["content-encoding"] @@ -275,10 +360,26 @@ func (receiver *pubsubReceiver) detectEncoding(attributes map[string]string) (en otlpCompression = gZip } } - return otlpEncoding, otlpCompression + return } -func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error { +func convertEncoding(encodingConfig string) (encoding buildInEncoding) { + switch encodingConfig { + case "otlp_proto_trace": + return otlpProtoTrace + case "otlp_proto_metric": + return otlpProtoMetric + case "otlp_proto_log": + return otlpProtoLog + case "cloud_logging": + return cloudLogging + case "raw_text": + return rawTextLog + } + return unknown +} + +func (receiver *pubsubReceiver) createMultiplexingReceiverHandler(ctx context.Context) error { var err error receiver.handler, err = internal.NewHandler( ctx, @@ -303,16 +404,14 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error if receiver.logsConsumer != nil { return receiver.handleLog(ctx, payload, compression) } - case cloudLogging: + case rawTextLog: if receiver.logsConsumer != nil { - return receiver.handleCloudLoggingLogEntry(ctx, message) + return receiver.handleLogStrings(ctx, payload) } - case rawTextLog: - return receiver.handleLogStrings(ctx, message) - case unknown: + default: return errors.New("unknown encoding") } - return errors.New("unknown encoding") + return nil }) if err != nil { return err @@ -320,3 +419,40 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error receiver.handler.RecoverableStream(ctx) return nil } + +func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error { + var err error + var handlerFn func(context.Context, *pubsubpb.ReceivedMessage) error + compression := uncompressed + if receiver.tracesConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleTrace(ctx, payload, compression) + } + } + if receiver.logsConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleLog(ctx, payload, compression) + } + } + if receiver.metricsConsumer != nil { + handlerFn = func(ctx context.Context, message *pubsubpb.ReceivedMessage) error { + payload := message.Message.Data + return receiver.handleMetric(ctx, payload, compression) + } + } + + receiver.handler, err = internal.NewHandler( + ctx, + receiver.logger, + receiver.client, + receiver.config.ClientID, + receiver.config.Subscription, + handlerFn) + if err != nil { + return err + } + receiver.handler.RecoverableStream(ctx) + return nil +} diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index 01ed07c3f7ae..72eca61b1315 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" @@ -24,13 +25,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/testdata" ) -func TestStartReceiverNoSubscription(t *testing.T) { - ctx := context.Background() - // Start a fake server running locally. +func createBaseReceiver() (*pstest.Server, *pubsubReceiver) { srv := pstest.NewServer() - defer srv.Close() core, _ := observer.New(zap.WarnLevel) - receiver := &pubsubReceiver{ + return srv, &pubsubReceiver{ logger: zap.New(core), userAgent: "test-user-agent", @@ -44,15 +42,46 @@ func TestStartReceiverNoSubscription(t *testing.T) { Subscription: "projects/my-project/subscriptions/otlp", }, } +} + +type fakeUnmarshalLog struct{} + +func (fakeUnmarshalLog) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (fakeUnmarshalLog) Shutdown(_ context.Context) error { + return nil +} + +func (fakeUnmarshalLog) UnmarshalLogs(_ []byte) (plog.Logs, error) { + return plog.Logs{}, nil +} + +type fakeHost struct{} + +func (fakeHost) GetExtensions() map[component.ID]component.Component { + ext := make(map[component.ID]component.Component) + extensionID := component.ID{} + _ = extensionID.UnmarshalText([]byte("text_encoding")) + ext[extensionID] = fakeUnmarshalLog{} + return ext +} + +func TestStartReceiverNoSubscription(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() defer func() { + assert.NoError(t, srv.Close()) assert.NoError(t, receiver.Shutdown(ctx)) }() + receiver.tracesConsumer = consumertest.NewNop() receiver.metricsConsumer = consumertest.NewNop() receiver.logsConsumer = consumertest.NewNop() // No error is thrown as the stream is handled async, // no locks should be kept though - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) } func TestReceiver(t *testing.T) { @@ -103,14 +132,14 @@ func TestReceiver(t *testing.T) { metricsConsumer: metricSink, logsConsumer: logSink, } - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) receiver.tracesConsumer = traceSink receiver.metricsConsumer = metricSink receiver.logsConsumer = logSink // No error is thrown as the stream is handled async, // no locks should be kept though - assert.NoError(t, receiver.Start(ctx, nil)) + assert.NoError(t, receiver.Start(ctx, fakeHost{})) time.Sleep(1 * time.Second) @@ -156,3 +185,125 @@ func TestReceiver(t *testing.T) { assert.NoError(t, receiver.Shutdown(ctx)) assert.NoError(t, receiver.Shutdown(ctx)) } + +func TestEncodingMultipleConsumersForAnEncoding(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.metricsConsumer = consumertest.NewNop() + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "foo" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "multiple consumers were attached") +} + +func TestEncodingBuildInProtoTrace(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_trace" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.NotNil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.Nil(t, receiver.logsConsumer) +} + +func TestEncodingBuildInProtoMetric(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.metricsConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_metric" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.NotNil(t, receiver.metricsConsumer) + assert.Nil(t, receiver.logsConsumer) +} + +func TestEncodingBuildInProtoLog(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_log" + + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.NotNil(t, receiver.logsConsumer) +} + +func TestEncodingConsumerMismatch(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "otlp_proto_log" + + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "build in encoding otlp_proto_log is not supported for traces") +} + +func TestEncodingNotFound(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "foo" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "extension \"foo\" not found") +} + +func TestEncodingExtension(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.tracesConsumer = consumertest.NewNop() + receiver.config.Encoding = "text_encoding" + assert.ErrorContains(t, receiver.Start(ctx, fakeHost{}), "extension \"text_encoding\" is not a trace unmarshaler") +} + +func TestEncodingExtensionMismatch(t *testing.T) { + ctx := context.Background() + srv, receiver := createBaseReceiver() + defer func() { + assert.NoError(t, srv.Close()) + assert.NoError(t, receiver.Shutdown(ctx)) + }() + + receiver.logsConsumer = consumertest.NewNop() + receiver.config.Encoding = "text_encoding" + assert.NoError(t, receiver.Start(ctx, fakeHost{})) + assert.Nil(t, receiver.tracesConsumer) + assert.Nil(t, receiver.metricsConsumer) + assert.NotNil(t, receiver.logsConsumer) +}