From 3f1c6191271d47cee16970177d2832f3f4684a1c Mon Sep 17 00:00:00 2001 From: Evgeniy Zuikin Date: Sun, 4 Aug 2024 11:04:03 +0700 Subject: [PATCH] [exporter/kafka] Implement partitioning by resource attributes for logs (#33230) --- .../kafka-partition-logs-by-resources.yaml | 27 ++++ exporter/kafkaexporter/README.md | 1 + exporter/kafkaexporter/config.go | 2 + exporter/kafkaexporter/config_test.go | 3 + exporter/kafkaexporter/factory.go | 45 +------ exporter/kafkaexporter/factory_test.go | 72 +--------- exporter/kafkaexporter/kafka_exporter.go | 34 ++--- exporter/kafkaexporter/kafka_exporter_test.go | 42 +++--- exporter/kafkaexporter/marshaler.go | 75 +++++++---- exporter/kafkaexporter/marshaler_test.go | 126 +++++++++++++----- exporter/kafkaexporter/pdata_marshaler.go | 104 ++++++++------- exporter/kafkaexporter/testdata/config.yaml | 1 + 12 files changed, 271 insertions(+), 261 deletions(-) create mode 100644 .chloggen/kafka-partition-logs-by-resources.yaml diff --git a/.chloggen/kafka-partition-logs-by-resources.yaml b/.chloggen/kafka-partition-logs-by-resources.yaml new file mode 100644 index 000000000000..675c45bbac86 --- /dev/null +++ b/.chloggen/kafka-partition-logs-by-resources.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: add an ability to partition logs based on resource attributes. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33229] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 02137a862092..c0ea3212cd9e 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -38,6 +38,7 @@ The following settings can be optionally configured: - `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded. - `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default. - `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka. +- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka. - `auth` - `plain_text` - `username`: The username to use. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index d048b27a88f9..e1c433a268d8 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -53,6 +53,8 @@ type Config struct { PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"` + PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"` + // Metadata is the namespace for metadata management properties used by the // Client, and shared by the Producer/Consumer. Metadata Metadata `mapstructure:"metadata"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index 236125c3db35..b3542d236438 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, PartitionMetricsByResourceAttributes: true, + PartitionLogsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -114,6 +115,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, PartitionMetricsByResourceAttributes: true, + PartitionLogsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", Authentication: kafka.Authentication{ @@ -168,6 +170,7 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", PartitionTracesByID: true, PartitionMetricsByResourceAttributes: true, + PartitionLogsByResourceAttributes: true, Brokers: []string{"foo:123", "bar:456"}, ClientID: "test_client_id", ResolveCanonicalBootstrapServersOnly: true, diff --git a/exporter/kafkaexporter/factory.go b/exporter/kafkaexporter/factory.go index cc4fb3d55023..3334e3b268fc 100644 --- a/exporter/kafkaexporter/factory.go +++ b/exporter/kafkaexporter/factory.go @@ -40,45 +40,16 @@ const ( defaultFluxMaxMessages = 0 // partitioning metrics by resource attributes is disabled by default defaultPartitionMetricsByResourceAttributesEnabled = false + // partitioning logs by resource attributes is disabled by default + defaultPartitionLogsByResourceAttributesEnabled = false ) // FactoryOption applies changes to kafkaExporterFactory. type FactoryOption func(factory *kafkaExporterFactory) -// withTracesMarshalers adds tracesMarshalers. -func withTracesMarshalers(tracesMarshalers ...TracesMarshaler) FactoryOption { - return func(factory *kafkaExporterFactory) { - for _, marshaler := range tracesMarshalers { - factory.tracesMarshalers[marshaler.Encoding()] = marshaler - } - } -} - -// withMetricsMarshalers adds additional metric marshalers to the exporter factory. -func withMetricsMarshalers(metricMarshalers ...MetricsMarshaler) FactoryOption { - return func(factory *kafkaExporterFactory) { - for _, marshaler := range metricMarshalers { - factory.metricsMarshalers[marshaler.Encoding()] = marshaler - } - } -} - -// withLogsMarshalers adds additional log marshalers to the exporter factory. -func withLogsMarshalers(logsMarshalers ...LogsMarshaler) FactoryOption { - return func(factory *kafkaExporterFactory) { - for _, marshaler := range logsMarshalers { - factory.logsMarshalers[marshaler.Encoding()] = marshaler - } - } -} - // NewFactory creates Kafka exporter factory. func NewFactory(options ...FactoryOption) exporter.Factory { - f := &kafkaExporterFactory{ - tracesMarshalers: tracesMarshalers(), - metricsMarshalers: metricsMarshalers(), - logsMarshalers: logsMarshalers(), - } + f := &kafkaExporterFactory{} for _, o := range options { o(f) } @@ -102,6 +73,7 @@ func createDefaultConfig() component.Config { Topic: "", Encoding: defaultEncoding, PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled, + PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled, Metadata: Metadata{ Full: defaultMetadataFull, Retry: MetadataRetry{ @@ -119,9 +91,6 @@ func createDefaultConfig() component.Config { } type kafkaExporterFactory struct { - tracesMarshalers map[string]TracesMarshaler - metricsMarshalers map[string]MetricsMarshaler - logsMarshalers map[string]LogsMarshaler } func (f *kafkaExporterFactory) createTracesExporter( @@ -136,7 +105,7 @@ func (f *kafkaExporterFactory) createTracesExporter( if oCfg.Encoding == "otlp_json" { set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment") } - exp, err := newTracesExporter(oCfg, set, f.tracesMarshalers) + exp, err := newTracesExporter(oCfg, set) if err != nil { return nil, err } @@ -167,7 +136,7 @@ func (f *kafkaExporterFactory) createMetricsExporter( if oCfg.Encoding == "otlp_json" { set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment") } - exp, err := newMetricsExporter(oCfg, set, f.metricsMarshalers) + exp, err := newMetricsExporter(oCfg, set) if err != nil { return nil, err } @@ -198,7 +167,7 @@ func (f *kafkaExporterFactory) createLogsExporter( if oCfg.Encoding == "otlp_json" { set.Logger.Info("otlp_json is considered experimental and should not be used in a production environment") } - exp, err := newLogsExporter(oCfg, set, f.logsMarshalers) + exp, err := newLogsExporter(oCfg, set) if err != nil { return nil, err } diff --git a/exporter/kafkaexporter/factory_test.go b/exporter/kafkaexporter/factory_test.go index d927692e4fd1..cc0df18074e5 100644 --- a/exporter/kafkaexporter/factory_test.go +++ b/exporter/kafkaexporter/factory_test.go @@ -5,45 +5,15 @@ package kafkaexporter import ( "context" - "errors" "net" "testing" - "github.com/IBM/sarama" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/exporter/exportertest" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/pdata/ptrace" ) -// data is a simple means of allowing -// interchangeability between the -// different marshaller types -type data interface { - ptrace.Traces | plog.Logs | pmetric.Metrics -} - -type mockMarshaler[Data data] struct { - consume func(d Data, topic string) ([]*sarama.ProducerMessage, error) - encoding string -} - -func (mm *mockMarshaler[Data]) Encoding() string { return mm.encoding } - -func (mm *mockMarshaler[Data]) Marshal(d Data, topic string) ([]*sarama.ProducerMessage, error) { - if mm.consume != nil { - return mm.consume(d, topic) - } - return nil, errors.New("not implemented") -} - -func newMockMarshaler[Data data](encoding string) *mockMarshaler[Data] { - return &mockMarshaler[Data]{encoding: encoding} -} - // applyConfigOption is used to modify values of the // the default exporter config to make it easier to // use the return in a test table set up @@ -100,18 +70,6 @@ func TestCreateMetricExporter(t *testing.T) { marshalers: nil, err: nil, }, - { - name: "custom_encoding", - conf: applyConfigOption(func(conf *Config) { - // Disabling broker check to ensure encoding work - conf.Metadata.Full = false - conf.Encoding = "custom" - }), - marshalers: []MetricsMarshaler{ - newMockMarshaler[pmetric.Metrics]("custom"), - }, - err: nil, - }, } for _, tc := range tests { @@ -119,7 +77,7 @@ func TestCreateMetricExporter(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - f := NewFactory(withMetricsMarshalers(tc.marshalers...)) + f := NewFactory() exporter, err := f.CreateMetricsExporter( context.Background(), exportertest.NewNopSettings(), @@ -177,18 +135,6 @@ func TestCreateLogExporter(t *testing.T) { marshalers: nil, err: nil, }, - { - name: "custom_encoding", - conf: applyConfigOption(func(conf *Config) { - // Disabling broker check to ensure encoding work - conf.Metadata.Full = false - conf.Encoding = "custom" - }), - marshalers: []LogsMarshaler{ - newMockMarshaler[plog.Logs]("custom"), - }, - err: nil, - }, } for _, tc := range tests { @@ -196,7 +142,7 @@ func TestCreateLogExporter(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - f := NewFactory(withLogsMarshalers(tc.marshalers...)) + f := NewFactory() exporter, err := f.CreateLogsExporter( context.Background(), exportertest.NewNopSettings(), @@ -254,18 +200,6 @@ func TestCreateTraceExporter(t *testing.T) { marshalers: nil, err: nil, }, - { - name: "custom_encoding", - conf: applyConfigOption(func(conf *Config) { - // Disabling broker check to ensure encoding work - conf.Metadata.Full = false - conf.Encoding = "custom" - }), - marshalers: []TracesMarshaler{ - newMockMarshaler[ptrace.Traces]("custom"), - }, - err: nil, - }, } for _, tc := range tests { @@ -273,7 +207,7 @@ func TestCreateTraceExporter(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - f := NewFactory(withTracesMarshalers(tc.marshalers...)) + f := NewFactory() exporter, err := f.CreateTracesExporter( context.Background(), exportertest.NewNopSettings(), diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index f15b8b046ebd..5e57e5d63678 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -204,16 +204,15 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) { return producer, nil } -func newMetricsExporter(config Config, set exporter.Settings, marshalers map[string]MetricsMarshaler) (*kafkaMetricsProducer, error) { - marshaler := marshalers[config.Encoding] +func newMetricsExporter(config Config, set exporter.Settings) (*kafkaMetricsProducer, error) { + marshaler, err := createMetricMarshaler(config) + if err != nil { + return nil, err + } + if marshaler == nil { return nil, errUnrecognizedEncoding } - if config.PartitionMetricsByResourceAttributes { - if keyableMarshaler, ok := marshaler.(KeyableMetricsMarshaler); ok { - keyableMarshaler.Key() - } - } return &kafkaMetricsProducer{ cfg: config, @@ -224,15 +223,10 @@ func newMetricsExporter(config Config, set exporter.Settings, marshalers map[str } // newTracesExporter creates Kafka exporter. -func newTracesExporter(config Config, set exporter.Settings, marshalers map[string]TracesMarshaler) (*kafkaTracesProducer, error) { - marshaler := marshalers[config.Encoding] - if marshaler == nil { - return nil, errUnrecognizedEncoding - } - if config.PartitionTracesByID { - if keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler); ok { - keyableMarshaler.Key() - } +func newTracesExporter(config Config, set exporter.Settings) (*kafkaTracesProducer, error) { + marshaler, err := createTracesMarshaler(config) + if err != nil { + return nil, err } return &kafkaTracesProducer{ @@ -242,10 +236,10 @@ func newTracesExporter(config Config, set exporter.Settings, marshalers map[stri }, nil } -func newLogsExporter(config Config, set exporter.Settings, marshalers map[string]LogsMarshaler) (*kafkaLogsProducer, error) { - marshaler := marshalers[config.Encoding] - if marshaler == nil { - return nil, errUnrecognizedEncoding +func newLogsExporter(config Config, set exporter.Settings) (*kafkaLogsProducer, error) { + marshaler, err := createLogMarshaler(config) + if err != nil { + return nil, err } return &kafkaLogsProducer{ diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 4da9969b9b74..aff1fdb1fcea 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -26,7 +26,7 @@ import ( func TestNewExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} - texp, err := newTracesExporter(c, exportertest.NewNopSettings(), tracesMarshalers()) + texp, err := newTracesExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) @@ -34,14 +34,14 @@ func TestNewExporter_err_version(t *testing.T) { func TestNewExporter_err_encoding(t *testing.T) { c := Config{Encoding: "foo"} - texp, err := newTracesExporter(c, exportertest.NewNopSettings(), tracesMarshalers()) + texp, err := newTracesExporter(c, exportertest.NewNopSettings()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, texp) } func TestNewMetricsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} - mexp, err := newMetricsExporter(c, exportertest.NewNopSettings(), metricsMarshalers()) + mexp, err := newMetricsExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = mexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) @@ -49,21 +49,21 @@ func TestNewMetricsExporter_err_version(t *testing.T) { func TestNewMetricsExporter_err_encoding(t *testing.T) { c := Config{Encoding: "bar"} - mexp, err := newMetricsExporter(c, exportertest.NewNopSettings(), metricsMarshalers()) + mexp, err := newMetricsExporter(c, exportertest.NewNopSettings()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, mexp) } func TestNewMetricsExporter_err_traces_encoding(t *testing.T) { c := Config{Encoding: "jaeger_proto"} - mexp, err := newMetricsExporter(c, exportertest.NewNopSettings(), metricsMarshalers()) + mexp, err := newMetricsExporter(c, exportertest.NewNopSettings()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, mexp) } func TestNewLogsExporter_err_version(t *testing.T) { c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding} - lexp, err := newLogsExporter(c, exportertest.NewNopSettings(), logsMarshalers()) + lexp, err := newLogsExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = lexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) @@ -71,14 +71,14 @@ func TestNewLogsExporter_err_version(t *testing.T) { func TestNewLogsExporter_err_encoding(t *testing.T) { c := Config{Encoding: "bar"} - mexp, err := newLogsExporter(c, exportertest.NewNopSettings(), logsMarshalers()) + mexp, err := newLogsExporter(c, exportertest.NewNopSettings()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, mexp) } func TestNewLogsExporter_err_traces_encoding(t *testing.T) { c := Config{Encoding: "jaeger_proto"} - mexp, err := newLogsExporter(c, exportertest.NewNopSettings(), logsMarshalers()) + mexp, err := newLogsExporter(c, exportertest.NewNopSettings()) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) assert.Nil(t, mexp) } @@ -101,17 +101,17 @@ func TestNewExporter_err_auth_type(t *testing.T) { Compression: "none", }, } - texp, err := newTracesExporter(c, exportertest.NewNopSettings(), tracesMarshalers()) + texp, err := newTracesExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - mexp, err := newMetricsExporter(c, exportertest.NewNopSettings(), metricsMarshalers()) + mexp, err := newMetricsExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = mexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") - lexp, err := newLogsExporter(c, exportertest.NewNopSettings(), logsMarshalers()) + lexp, err := newLogsExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = lexp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) @@ -126,7 +126,7 @@ func TestNewExporter_err_compression(t *testing.T) { Compression: "idk", }, } - texp, err := newTracesExporter(c, exportertest.NewNopSettings(), tracesMarshalers()) + texp, err := newTracesExporter(c, exportertest.NewNopSettings()) require.NoError(t, err) err = texp.start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) @@ -140,7 +140,7 @@ func TestTracesPusher(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -159,7 +159,7 @@ func TestTracesPusher_attr(t *testing.T) { TopicFromAttribute: "kafka_topic", }, producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -176,7 +176,7 @@ func TestTracesPusher_err(t *testing.T) { p := kafkaTracesProducer{ producer: producer, - marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), logger: zap.NewNop(), } t.Cleanup(func() { @@ -206,7 +206,7 @@ func TestMetricsDataPusher(t *testing.T) { p := kafkaMetricsProducer{ producer: producer, - marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -225,7 +225,7 @@ func TestMetricsDataPusher_attr(t *testing.T) { TopicFromAttribute: "kafka_topic", }, producer: producer, - marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -242,7 +242,7 @@ func TestMetricsDataPusher_err(t *testing.T) { p := kafkaMetricsProducer{ producer: producer, - marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false), logger: zap.NewNop(), } t.Cleanup(func() { @@ -272,7 +272,7 @@ func TestLogsDataPusher(t *testing.T) { p := kafkaLogsProducer{ producer: producer, - marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -291,7 +291,7 @@ func TestLogsDataPusher_attr(t *testing.T) { TopicFromAttribute: "kafka_topic", }, producer: producer, - marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false), } t.Cleanup(func() { require.NoError(t, p.Close(context.Background())) @@ -308,7 +308,7 @@ func TestLogsDataPusher_err(t *testing.T) { p := kafkaLogsProducer{ producer: producer, - marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding), + marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false), logger: zap.NewNop(), } t.Cleanup(func() { diff --git a/exporter/kafkaexporter/marshaler.go b/exporter/kafkaexporter/marshaler.go index 4acb625407fd..89b4bcc3893c 100644 --- a/exporter/kafkaexporter/marshaler.go +++ b/exporter/kafkaexporter/marshaler.go @@ -39,42 +39,61 @@ type LogsMarshaler interface { Encoding() string } -// tracesMarshalers returns map of supported encodings with TracesMarshaler. -func tracesMarshalers() map[string]TracesMarshaler { - otlpPb := newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding) - otlpJSON := newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json") - zipkinProto := newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto") - zipkinJSON := newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json") +// creates TracesMarshaler based on the provided config +func createTracesMarshaler(config Config) (TracesMarshaler, error) { + encoding := config.Encoding + partitionTracesByID := config.PartitionTracesByID + jaegerProto := jaegerMarshaler{marshaler: jaegerProtoSpanMarshaler{}} jaegerJSON := jaegerMarshaler{marshaler: newJaegerJSONMarshaler()} - return map[string]TracesMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, - zipkinProto.Encoding(): zipkinProto, - zipkinJSON.Encoding(): zipkinJSON, - jaegerProto.Encoding(): jaegerProto, - jaegerJSON.Encoding(): jaegerJSON, + + switch encoding { + case defaultEncoding: + return newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, partitionTracesByID), nil + case "otlp_json": + return newPdataTracesMarshaler(&ptrace.JSONMarshaler{}, "otlp_json", partitionTracesByID), nil + case "zipkin_proto": + return newPdataTracesMarshaler(zipkinv2.NewProtobufTracesMarshaler(), "zipkin_proto", partitionTracesByID), nil + case "zipkin_json": + return newPdataTracesMarshaler(zipkinv2.NewJSONTracesMarshaler(), "zipkin_json", partitionTracesByID), nil + case jaegerProtoSpanMarshaler{}.encoding(): + return jaegerProto, nil + case jaegerJSON.Encoding(): + return jaegerJSON, nil + default: + return nil, errUnrecognizedEncoding } + } -// metricsMarshalers returns map of supported encodings and MetricsMarshaler -func metricsMarshalers() map[string]MetricsMarshaler { - otlpPb := newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding) - otlpJSON := newPdataMetricsMarshaler(&pmetric.JSONMarshaler{}, "otlp_json") - return map[string]MetricsMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, +// creates MetricsMarshaler based on the provided config +func createMetricMarshaler(config Config) (MetricsMarshaler, error) { + encoding := config.Encoding + partitionMetricsByResources := config.PartitionMetricsByResourceAttributes + switch encoding { + case defaultEncoding: + return newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, partitionMetricsByResources), nil + case "otlp_json": + return newPdataMetricsMarshaler(&pmetric.JSONMarshaler{}, "otlp_json", partitionMetricsByResources), nil + default: + return nil, errUnrecognizedEncoding } } -// logsMarshalers returns map of supported encodings and LogsMarshaler -func logsMarshalers() map[string]LogsMarshaler { - otlpPb := newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding) - otlpJSON := newPdataLogsMarshaler(&plog.JSONMarshaler{}, "otlp_json") +// creates LogsMarshalers based on the provided config +func createLogMarshaler(config Config) (LogsMarshaler, error) { + encoding := config.Encoding + partitionLogsByAttributes := config.PartitionLogsByResourceAttributes + raw := newRawMarshaler() - return map[string]LogsMarshaler{ - otlpPb.Encoding(): otlpPb, - otlpJSON.Encoding(): otlpJSON, - raw.Encoding(): raw, + switch encoding { + case defaultEncoding: + return newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, partitionLogsByAttributes), nil + case "otlp_json": + return newPdataLogsMarshaler(&plog.JSONMarshaler{}, "otlp_json", partitionLogsByAttributes), nil + case raw.Encoding(): + return raw, nil + default: + return nil, errUnrecognizedEncoding } } diff --git a/exporter/kafkaexporter/marshaler_test.go b/exporter/kafkaexporter/marshaler_test.go index 5f40379d75a2..05bb0ecd8712 100644 --- a/exporter/kafkaexporter/marshaler_test.go +++ b/exporter/kafkaexporter/marshaler_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" @@ -28,12 +29,12 @@ func TestDefaultTracesMarshalers(t *testing.T) { "jaeger_proto", "jaeger_json", } - marshalers := tracesMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) for _, e := range expectedEncodings { t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) + m, err := createTracesMarshaler(Config{ + Encoding: e, + }) + require.Nil(t, err) assert.NotNil(t, m) }) } @@ -44,12 +45,12 @@ func TestDefaultMetricsMarshalers(t *testing.T) { "otlp_proto", "otlp_json", } - marshalers := metricsMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) for _, e := range expectedEncodings { t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) + m, err := createMetricMarshaler(Config{ + Encoding: e, + }) + require.Nil(t, err) assert.NotNil(t, m) }) } @@ -61,12 +62,12 @@ func TestDefaultLogsMarshalers(t *testing.T) { "otlp_json", "raw", } - marshalers := logsMarshalers() - assert.Equal(t, len(expectedEncodings), len(marshalers)) for _, e := range expectedEncodings { t.Run(e, func(t *testing.T) { - m, ok := marshalers[e] - require.True(t, ok) + m, err := createLogMarshaler(Config{ + Encoding: e, + }) + require.Nil(t, err) assert.NotNil(t, m) }) } @@ -75,17 +76,17 @@ func TestDefaultLogsMarshalers(t *testing.T) { func TestOTLPMetricsJsonMarshaling(t *testing.T) { tests := []struct { name string - keyEnabled bool + partitionByResources bool messagePartitionKeys []sarama.Encoder }{ { name: "partitioning_disabled", - keyEnabled: false, + partitionByResources: false, messagePartitionKeys: []sarama.Encoder{nil}, }, { - name: "partitioning_enabled", - keyEnabled: true, + name: "partitioning_enabled", + partitionByResources: true, messagePartitionKeys: []sarama.Encoder{ sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, @@ -116,16 +117,76 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) { r1.Attributes().PutStr("service.name", "my_service_name") r1.CopyTo(metric.ResourceMetrics().AppendEmpty().Resource()) - standardMarshaler := metricsMarshalers()["otlp_json"] - keyableMarshaler, ok := standardMarshaler.(KeyableMetricsMarshaler) - require.True(t, ok, "Must be a KeyableMetricsMarshaler") - if tt.keyEnabled { - keyableMarshaler.Key() - } + marshaler, err := createMetricMarshaler( + Config{ + Encoding: "otlp_json", + PartitionMetricsByResourceAttributes: tt.partitionByResources, + }) + require.Nil(t, err) - msgs, err := standardMarshaler.Marshal(metric, "KafkaTopicX") + msgs, err := marshaler.Marshal(metric, "KafkaTopicX") require.NoError(t, err, "Must have marshaled the data without error") + require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs)) + + for i := 0; i < len(tt.messagePartitionKeys); i++ { + require.Equal(t, tt.messagePartitionKeys[i], msgs[i].Key, "message %d has incorrect key", i) + } + }) + } +} + +func TestOTLPLogsJsonMarshaling(t *testing.T) { + tests := []struct { + name string + partitionByResources bool + messagePartitionKeys []sarama.Encoder + }{ + { + name: "partitioning_disabled", + partitionByResources: false, + messagePartitionKeys: []sarama.Encoder{nil}, + }, + { + name: "partitioning_enabled", + partitionByResources: true, + messagePartitionKeys: []sarama.Encoder{ + sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16}, + sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + log := plog.NewLogs() + r := pcommon.NewResource() + r.Attributes().PutStr("service.name", "my_service_name") + r.Attributes().PutStr("service.instance.id", "kek_x_1") + r.CopyTo(log.ResourceLogs().AppendEmpty().Resource()) + + rm := log.ResourceLogs().At(0) + rm.SetSchemaUrl(conventions.SchemaURL) + sl := rm.ScopeLogs().AppendEmpty() + plog.NewScopeLogs() + l := sl.LogRecords().AppendEmpty() + l.SetSeverityText("INFO") + l.SetTimestamp(pcommon.Timestamp(1)) + l.Body().SetStr("Simple log message") + + r1 := pcommon.NewResource() + r1.Attributes().PutStr("service.instance.id", "kek_x_2") + r1.Attributes().PutStr("service.name", "my_service_name") + r1.CopyTo(log.ResourceLogs().AppendEmpty().Resource()) + + marshaler, err := createLogMarshaler( + Config{ + Encoding: "otlp_json", + PartitionLogsByResourceAttributes: tt.partitionByResources, + }) + require.Nil(t, err) + + msgs, err := marshaler.Marshal(log, "KafkaTopicX") + require.NoError(t, err, "Must have marshaled the data without error") require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs)) for i := 0; i < len(tt.messagePartitionKeys); i++ { @@ -382,28 +443,25 @@ func TestOTLPTracesJsonMarshaling(t *testing.T) { tests := []struct { encoding string - keyed bool + partitionTracesByID bool numExpectedMessages int expectedJSON []any expectedMessageKey []sarama.Encoder unmarshaled any }{ {encoding: "otlp_json", numExpectedMessages: 1, expectedJSON: unkeyedOtlpJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: map[string]any{}}, - {encoding: "otlp_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}}, + {encoding: "otlp_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedOtlpJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: map[string]any{}}, {encoding: "zipkin_json", numExpectedMessages: 1, expectedJSON: unkeyedZipkinJSONResult, expectedMessageKey: unkeyedMessageKey, unmarshaled: []map[string]any{}}, - {encoding: "zipkin_json", keyed: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}}, + {encoding: "zipkin_json", partitionTracesByID: true, numExpectedMessages: 2, expectedJSON: keyedZipkinJSONResult, expectedMessageKey: keyedMessageKey, unmarshaled: []map[string]any{}}, } for _, test := range tests { - marshaler, ok := tracesMarshalers()[test.encoding] - require.True(t, ok, fmt.Sprintf("Must have %s marshaller", test.encoding)) - - if test.keyed { - keyableMarshaler, ok := marshaler.(KeyableTracesMarshaler) - require.True(t, ok, "Must be a KeyableTracesMarshaler") - keyableMarshaler.Key() - } + marshaler, err := createTracesMarshaler(Config{ + Encoding: test.encoding, + PartitionTracesByID: test.partitionTracesByID, + }) + require.Nil(t, err, fmt.Sprintf("Must have %s marshaler", test.encoding)) msg, err := marshaler.Marshal(traces, t.Name()) require.NoError(t, err, "Must have marshaled the data without error") diff --git a/exporter/kafkaexporter/pdata_marshaler.go b/exporter/kafkaexporter/pdata_marshaler.go index 3429cdd8316e..72a90b54fb16 100644 --- a/exporter/kafkaexporter/pdata_marshaler.go +++ b/exporter/kafkaexporter/pdata_marshaler.go @@ -15,55 +15,67 @@ import ( ) type pdataLogsMarshaler struct { - marshaler plog.Marshaler - encoding string + marshaler plog.Marshaler + encoding string + partitionedByResources bool } func (p pdataLogsMarshaler) Marshal(ld plog.Logs, topic string) ([]*sarama.ProducerMessage, error) { - bts, err := p.marshaler.MarshalLogs(ld) - if err != nil { - return nil, err - } - return []*sarama.ProducerMessage{ - { + var msgs []*sarama.ProducerMessage + if p.partitionedByResources { + logs := ld.ResourceLogs() + + for i := 0; i < logs.Len(); i++ { + resourceMetrics := logs.At(i) + var hash = pdatautil.MapHash(resourceMetrics.Resource().Attributes()) + + newLogs := plog.NewLogs() + resourceMetrics.CopyTo(newLogs.ResourceLogs().AppendEmpty()) + + bts, err := p.marshaler.MarshalLogs(newLogs) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(bts), + Key: sarama.ByteEncoder(hash[:]), + }) + } + } else { + bts, err := p.marshaler.MarshalLogs(ld) + if err != nil { + return nil, err + } + msgs = append(msgs, &sarama.ProducerMessage{ Topic: topic, Value: sarama.ByteEncoder(bts), - }, - }, nil + }) + } + return msgs, nil } func (p pdataLogsMarshaler) Encoding() string { return p.encoding } -func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarshaler { +func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string, partitionedByResources bool) LogsMarshaler { return pdataLogsMarshaler{ - marshaler: marshaler, - encoding: encoding, + marshaler: marshaler, + encoding: encoding, + partitionedByResources: partitionedByResources, } } -// KeyableMetricsMarshaler is an extension of the MetricsMarshaler interface intended to provide partition key capabilities -// for metrics messages -type KeyableMetricsMarshaler interface { - MetricsMarshaler - Key() -} - type pdataMetricsMarshaler struct { - marshaler pmetric.Marshaler - encoding string - keyed bool -} - -// Key configures the pdataMetricsMarshaler to set the message key on the kafka messages -func (p *pdataMetricsMarshaler) Key() { - p.keyed = true + marshaler pmetric.Marshaler + encoding string + partitionedByResources bool } func (p pdataMetricsMarshaler) Marshal(ld pmetric.Metrics, topic string) ([]*sarama.ProducerMessage, error) { var msgs []*sarama.ProducerMessage - if p.keyed { + if p.partitionedByResources { metrics := ld.ResourceMetrics() for i := 0; i < metrics.Len(); i++ { @@ -101,29 +113,23 @@ func (p pdataMetricsMarshaler) Encoding() string { return p.encoding } -func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string) MetricsMarshaler { +func newPdataMetricsMarshaler(marshaler pmetric.Marshaler, encoding string, partitionedByResources bool) MetricsMarshaler { return &pdataMetricsMarshaler{ - marshaler: marshaler, - encoding: encoding, + marshaler: marshaler, + encoding: encoding, + partitionedByResources: partitionedByResources, } } -// KeyableTracesMarshaler is an extension of the TracesMarshaler interface intended to provide partition key capabilities -// for trace messages -type KeyableTracesMarshaler interface { - TracesMarshaler - Key() -} - type pdataTracesMarshaler struct { - marshaler ptrace.Marshaler - encoding string - keyed bool + marshaler ptrace.Marshaler + encoding string + partitionedByTraceID bool } func (p *pdataTracesMarshaler) Marshal(td ptrace.Traces, topic string) ([]*sarama.ProducerMessage, error) { var msgs []*sarama.ProducerMessage - if p.keyed { + if p.partitionedByTraceID { for _, trace := range batchpersignal.SplitTraces(td) { bts, err := p.marshaler.MarshalTraces(trace) if err != nil { @@ -154,14 +160,10 @@ func (p *pdataTracesMarshaler) Encoding() string { return p.encoding } -// Key configures the pdataTracesMarshaler to set the message key on the kafka messages -func (p *pdataTracesMarshaler) Key() { - p.keyed = true -} - -func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string) TracesMarshaler { +func newPdataTracesMarshaler(marshaler ptrace.Marshaler, encoding string, partitionedByTraceID bool) TracesMarshaler { return &pdataTracesMarshaler{ - marshaler: marshaler, - encoding: encoding, + marshaler: marshaler, + encoding: encoding, + partitionedByTraceID: partitionedByTraceID, } } diff --git a/exporter/kafkaexporter/testdata/config.yaml b/exporter/kafkaexporter/testdata/config.yaml index 7c89bea74ade..016e931394e3 100644 --- a/exporter/kafkaexporter/testdata/config.yaml +++ b/exporter/kafkaexporter/testdata/config.yaml @@ -14,6 +14,7 @@ kafka: timeout: 10s partition_traces_by_id: true partition_metrics_by_resource_attributes: true + partition_logs_by_resource_attributes: true auth: plain_text: username: jdoe