diff --git a/.chloggen/codeboten_add-unit.yaml b/.chloggen/codeboten_add-unit.yaml new file mode 100644 index 000000000000..53b088706a69 --- /dev/null +++ b/.chloggen/codeboten_add-unit.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: groupbytraceprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Ensure processor_groupbytrace_incomplete_releases metric has a unit. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35221] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/kafka-topic-context.yaml b/.chloggen/kafka-topic-context.yaml new file mode 100644 index 000000000000..78d794be4efa --- /dev/null +++ b/.chloggen/kafka-topic-context.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add option to supply destination topic through context. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34503, 34432] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/rm-copies-deltatorateprocessor.yaml b/.chloggen/rm-copies-deltatorateprocessor.yaml new file mode 100644 index 000000000000..5e26c4fbcb51 --- /dev/null +++ b/.chloggen/rm-copies-deltatorateprocessor.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatorateprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove unnecessary data copies. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35165] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.chloggen/rm-copy-transformprocessor.yaml b/.chloggen/rm-copy-transformprocessor.yaml new file mode 100644 index 000000000000..074c499d91c7 --- /dev/null +++ b/.chloggen/rm-copy-transformprocessor.yaml @@ -0,0 +1,22 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: transformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Remove unnecessary data copy when transform sum to/from gauge + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35177] + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 676015f2efe9..56dab2a7b0ae 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -146,9 +146,10 @@ internal/tools/ @open-teleme pkg/batchperresourceattr/ @open-telemetry/collector-contrib-approvers @atoulme @dmitryax pkg/batchpersignal/ @open-telemetry/collector-contrib-approvers @jpkrohling -pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick pkg/datadog/ @open-telemetry/collector-contrib-approvers @mx-psi @dineshg13 @liustanley @songy23 @mackjmr @ankitpatel96 +pkg/experimentalmetricmetadata/ @open-telemetry/collector-contrib-approvers @rmfitzpatrick pkg/golden/ @open-telemetry/collector-contrib-approvers @djaglowski @atoulme +pkg/kafka/topic/ @open-telemetry/collector-contrib-approvers @pavolloffay @MovieStoreGuy pkg/ottl/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @kentquirk @bogdandrutu @evan-bradley pkg/pdatatest/ @open-telemetry/collector-contrib-approvers @djaglowski @fatsheep9146 pkg/pdatautil/ @open-telemetry/collector-contrib-approvers @dmitryax diff --git a/.github/ISSUE_TEMPLATE/bug_report.yaml b/.github/ISSUE_TEMPLATE/bug_report.yaml index 5374e66a9a89..1ec802e83d93 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yaml +++ b/.github/ISSUE_TEMPLATE/bug_report.yaml @@ -142,8 +142,10 @@ body: - internal/tools - pkg/batchperresourceattr - pkg/batchpersignal + - pkg/datadog - pkg/experimentalmetricmetadata - pkg/golden + - pkg/kafka/topic - pkg/ottl - pkg/pdatatest - pkg/pdatautil diff --git a/.github/ISSUE_TEMPLATE/feature_request.yaml b/.github/ISSUE_TEMPLATE/feature_request.yaml index cbaad5acb6ee..3027e60c72a8 100644 --- a/.github/ISSUE_TEMPLATE/feature_request.yaml +++ b/.github/ISSUE_TEMPLATE/feature_request.yaml @@ -136,8 +136,10 @@ body: - internal/tools - pkg/batchperresourceattr - pkg/batchpersignal + - pkg/datadog - pkg/experimentalmetricmetadata - pkg/golden + - pkg/kafka/topic - pkg/ottl - pkg/pdatatest - pkg/pdatautil diff --git a/.github/ISSUE_TEMPLATE/other.yaml b/.github/ISSUE_TEMPLATE/other.yaml index 2bb37ab37089..e1e3cc57cd59 100644 --- a/.github/ISSUE_TEMPLATE/other.yaml +++ b/.github/ISSUE_TEMPLATE/other.yaml @@ -136,8 +136,10 @@ body: - internal/tools - pkg/batchperresourceattr - pkg/batchpersignal + - pkg/datadog - pkg/experimentalmetricmetadata - pkg/golden + - pkg/kafka/topic - pkg/ottl - pkg/pdatatest - pkg/pdatautil diff --git a/.github/ISSUE_TEMPLATE/unmaintained.yaml b/.github/ISSUE_TEMPLATE/unmaintained.yaml index 55b55fd1b45c..9f8c4ca17bab 100644 --- a/.github/ISSUE_TEMPLATE/unmaintained.yaml +++ b/.github/ISSUE_TEMPLATE/unmaintained.yaml @@ -141,8 +141,10 @@ body: - internal/tools - pkg/batchperresourceattr - pkg/batchpersignal + - pkg/datadog - pkg/experimentalmetricmetadata - pkg/golden + - pkg/kafka/topic - pkg/ottl - pkg/pdatatest - pkg/pdatautil diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 830cc3737be7..8a87ab0e5749 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -398,6 +398,7 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter => ../../exporter/fileexporter - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry => ../../pkg/resourcetotelemetry - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter - github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter - github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders => ../../internal/metadataproviders diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index dc06d555e700..7e8352d30598 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -658,6 +658,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog v0.0.0-00010101000000-000000000000 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.109.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.109.0 // indirect @@ -1183,6 +1184,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourceto replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic + replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opencensusexporter => ../../exporter/opencensusexporter replace github.com/open-telemetry/opentelemetry-collector-contrib/exporter/opensearchexporter => ../../exporter/opensearchexporter diff --git a/exporter/coralogixexporter/README.md b/exporter/coralogixexporter/README.md index da07aca9cfe0..f6068e6163d9 100644 --- a/exporter/coralogixexporter/README.md +++ b/exporter/coralogixexporter/README.md @@ -86,14 +86,15 @@ exporters: Depending on your region, you might need to use a different domain. Here are the available domains: -| Region | Domain | -|---------|---------------------------------| -| USA1 | `coralogix.us` | -| USA2 | `cx498.coralogix.com` | -| APAC1 | `coralogix.in` | -| APAC2 | `coralogixsg.com` | -| EUROPE1 | `coralogix.com` | -| EUROPE2 | `eu2.coralogix.com` | +| Region | Domain | +|---------|-------------------------| +| USA1 | `coralogix.us` | +| USA2 | `cx498.coralogix.com` | +| APAC1 | `coralogix.in` | +| APAC2 | `coralogixsg.com` | +| APAC3 | `ap3.coralogix.com` | +| EUROPE1 | `coralogix.com` | +| EUROPE2 | `eu2.coralogix.com` | Additionally, Coralogix supports AWS PrivateLink, which provides private connectivity between virtual private clouds (VPCs), supported AWS services, and your on-premises networks without exposing your traffic to the public internet. diff --git a/exporter/dorisexporter/exporter_logs_test.go b/exporter/dorisexporter/exporter_logs_test.go index 8e28aa1ec73b..60097ab6ce4a 100644 --- a/exporter/dorisexporter/exporter_logs_test.go +++ b/exporter/dorisexporter/exporter_logs_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -52,7 +53,7 @@ func TestPushLogData(t *testing.T) { _, _ = w.Write([]byte(`{"Status":"Success"}`)) }) err = server.ListenAndServe() - require.Equal(t, http.ErrServerClosed, err) + assert.Equal(t, http.ErrServerClosed, err) }() err0 := fmt.Errorf("Not Started") diff --git a/exporter/dorisexporter/exporter_traces_test.go b/exporter/dorisexporter/exporter_traces_test.go index aafc08416132..396c320cf2e8 100644 --- a/exporter/dorisexporter/exporter_traces_test.go +++ b/exporter/dorisexporter/exporter_traces_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -52,7 +53,7 @@ func TestPushTraceData(t *testing.T) { _, _ = w.Write([]byte(`{"Status":"Success"}`)) }) err = server.ListenAndServe() - require.Equal(t, http.ErrServerClosed, err) + assert.Equal(t, http.ErrServerClosed, err) }() err0 := fmt.Errorf("Not Started") diff --git a/exporter/dorisexporter/factory.go b/exporter/dorisexporter/factory.go index be7259cb4a9e..2acf0c3f36f2 100644 --- a/exporter/dorisexporter/factory.go +++ b/exporter/dorisexporter/factory.go @@ -60,7 +60,7 @@ func createLogsExporter(ctx context.Context, set exporter.Settings, cfg componen exporterhelper.WithStart(exporter.start), exporterhelper.WithShutdown(exporter.shutdown), // we config the timeout option in http client, so we don't need to set timeout here - exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}), exporterhelper.WithQueue(c.QueueSettings), exporterhelper.WithRetry(c.BackOffConfig), ) diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 8bc9de0b5f3f..a27a28ccfe6e 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -395,8 +395,7 @@ func TestConfig_Validate(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { - err := tt.config.Validate() - assert.EqualError(t, err, tt.err) + assert.EqualError(t, component.ValidateConfig(tt.config), tt.err) }) } } @@ -405,13 +404,13 @@ func TestConfig_Validate_Environment(t *testing.T) { t.Run("valid", func(t *testing.T) { t.Setenv("ELASTICSEARCH_URL", "http://test:9200") config := withDefaultConfig() - err := config.Validate() + err := component.ValidateConfig(config) require.NoError(t, err) }) t.Run("invalid", func(t *testing.T) { t.Setenv("ELASTICSEARCH_URL", "http://valid:9200, *:!") config := withDefaultConfig() - err := config.Validate() + err := component.ValidateConfig(config) assert.EqualError(t, err, `invalid endpoint "*:!": parse "*:!": first path segment in URL cannot contain colon`) }) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 2bf4c0250fa4..6f70a9ac6b84 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -42,11 +42,7 @@ func newExporter( set exporter.Settings, index string, dynamicIndex bool, -) (*elasticsearchExporter, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } - +) *elasticsearchExporter { model := &encodeModel{ dedot: cfg.Mapping.Dedot, mode: cfg.MappingMode(), @@ -72,7 +68,7 @@ func newExporter( model: model, logstashFormat: cfg.LogstashFormat, otel: otel, - }, nil + } } func (e *elasticsearchExporter) Start(ctx context.Context, host component.Host) error { diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index da97ba7d9f05..3f48ca1e2ec7 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -7,7 +7,6 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "context" - "fmt" "net/http" "time" @@ -113,10 +112,7 @@ func createLogsExporter( } logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) - if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) - } + exporter := newExporter(cf, set, index, cf.LogsDynamicIndex.Enabled) return exporterhelper.NewLogsExporter( ctx, @@ -135,10 +131,8 @@ func createMetricsExporter( cf := cfg.(*Config) logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) - if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) - } + exporter := newExporter(cf, set, cf.MetricsIndex, cf.MetricsDynamicIndex.Enabled) + return exporterhelper.NewMetricsExporter( ctx, set, @@ -150,15 +144,13 @@ func createMetricsExporter( func createTracesExporter(ctx context.Context, set exporter.Settings, - cfg component.Config) (exporter.Traces, error) { - + cfg component.Config, +) (exporter.Traces, error) { cf := cfg.(*Config) logConfigDeprecationWarnings(cf, set.Logger) - exporter, err := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) - if err != nil { - return nil, fmt.Errorf("cannot configure Elasticsearch exporter: %w", err) - } + exporter := newExporter(cf, set, cf.TracesIndex, cf.TracesDynamicIndex.Enabled) + return exporterhelper.NewTracesExporter( ctx, set, diff --git a/exporter/elasticsearchexporter/factory_test.go b/exporter/elasticsearchexporter/factory_test.go index a6e2c356d981..5acc985e008e 100644 --- a/exporter/elasticsearchexporter/factory_test.go +++ b/exporter/elasticsearchexporter/factory_test.go @@ -35,15 +35,6 @@ func TestFactory_CreateLogsExporter(t *testing.T) { require.NoError(t, exporter.Shutdown(context.Background())) } -func TestFactory_CreateLogsExporter_Fail(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - params := exportertest.NewNopSettings() - _, err := factory.CreateLogsExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a logs exporter") - assert.EqualError(t, err, "cannot configure Elasticsearch exporter: exactly one of [endpoint, endpoints, cloudid] must be specified") -} - func TestFactory_CreateMetricsExporter(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { @@ -70,15 +61,6 @@ func TestFactory_CreateTracesExporter(t *testing.T) { require.NoError(t, exporter.Shutdown(context.Background())) } -func TestFactory_CreateTracesExporter_Fail(t *testing.T) { - factory := NewFactory() - cfg := factory.CreateDefaultConfig() - params := exportertest.NewNopSettings() - _, err := factory.CreateTracesExporter(context.Background(), params, cfg) - require.Error(t, err, "expected an error when creating a traces exporter") - assert.EqualError(t, err, "cannot configure Elasticsearch exporter: exactly one of [endpoint, endpoints, cloudid] must be specified") -} - func TestFactory_CreateLogsAndTracesExporterWithDeprecatedIndexOption(t *testing.T) { factory := NewFactory() cfg := withDefaultConfig(func(cfg *Config) { diff --git a/exporter/fileexporter/README.md b/exporter/fileexporter/README.md index ec550717b6d9..422e3cf6aeae 100644 --- a/exporter/fileexporter/README.md +++ b/exporter/fileexporter/README.md @@ -13,6 +13,10 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib +Writes telemetry data to files on disk. + +Use the [OTLP JSON File receiver](../../receiver/otlpjsonfilereceiver/README.md) to read the data back into the collector (as long as the data was exported using OTLP JSON format). + Exporter supports the following features: + Support for writing pipeline data to a file. diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index c0ea3212cd9e..fc316e0494df 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -24,8 +24,8 @@ The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers. - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup. - `client_id` (default = "sarama"): The client ID to configure the Sarama Kafka client with. The client ID will be used for all produce requests. -- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to. -- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. This option, when set, will take precedence over the default topic. If `topic_from_attribute` is not set, the message's topic will be set to the value of the configuration option `topic` instead. +- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the default kafka topic to export to. See [Destination Topic](#destination-topic) below for more details. +- `topic_from_attribute` (default = ""): Specify the resource attribute whose value should be used as the message's topic. See [Destination Topic](#destination-topic) below for more details. - `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings: - `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. - `otlp_json`: payload is JSON serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. @@ -105,3 +105,9 @@ exporters: - localhost:9092 protocol_version: 2.0.0 ``` + +## Destination Topic +The destination topic can be defined in a few different ways and takes priority in the following order: +1. When `topic_from_attribute` is configured, and the corresponding attribute is found on the ingested data, the value of this attribute is used. +2. If a prior component in the collector pipeline sets the topic on the context via the `topic.WithTopic` function (from the `github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic` package), the value set in the context is used. +3. Finally, the `topic` configuration is used as a default/fallback destination. diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index ac5887ef94f7..049314db60e7 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -10,6 +10,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.109.0 @@ -106,6 +107,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal => ../../pkg/batchpersignal +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger => ../../pkg/translator/jaeger retract ( diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 845129b5b24e..fe3130cecc9c 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -19,6 +19,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic" ) var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") @@ -40,8 +41,8 @@ func (ke kafkaErrors) Error() string { return fmt.Sprintf("Failed to deliver %d messages due to %s", ke.count, ke.err) } -func (e *kafkaTracesProducer) tracesPusher(_ context.Context, td ptrace.Traces) error { - messages, err := e.marshaler.Marshal(td, getTopic(&e.cfg, td.ResourceSpans())) +func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error { + messages, err := e.marshaler.Marshal(td, getTopic(ctx, &e.cfg, td.ResourceSpans())) if err != nil { return consumererror.NewPermanent(err) } @@ -98,8 +99,8 @@ type kafkaMetricsProducer struct { logger *zap.Logger } -func (e *kafkaMetricsProducer) metricsDataPusher(_ context.Context, md pmetric.Metrics) error { - messages, err := e.marshaler.Marshal(md, getTopic(&e.cfg, md.ResourceMetrics())) +func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error { + messages, err := e.marshaler.Marshal(md, getTopic(ctx, &e.cfg, md.ResourceMetrics())) if err != nil { return consumererror.NewPermanent(err) } @@ -156,8 +157,8 @@ type kafkaLogsProducer struct { logger *zap.Logger } -func (e *kafkaLogsProducer) logsDataPusher(_ context.Context, ld plog.Logs) error { - messages, err := e.marshaler.Marshal(ld, getTopic(&e.cfg, ld.ResourceLogs())) +func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error { + messages, err := e.marshaler.Marshal(ld, getTopic(ctx, &e.cfg, ld.ResourceLogs())) if err != nil { return consumererror.NewPermanent(err) } @@ -283,16 +284,19 @@ type resource interface { Resource() pcommon.Resource } -func getTopic[T resource](cfg *Config, resources resourceSlice[T]) string { - if cfg.TopicFromAttribute == "" { - return cfg.Topic - } - for i := 0; i < resources.Len(); i++ { - rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute) - if ok && rv.Str() != "" { - return rv.Str() +func getTopic[T resource](ctx context.Context, cfg *Config, resources resourceSlice[T]) string { + if cfg.TopicFromAttribute != "" { + for i := 0; i < resources.Len(); i++ { + rv, ok := resources.At(i).Resource().Attributes().Get(cfg.TopicFromAttribute) + if ok && rv.Str() != "" { + return rv.Str() + } } } + contextTopic, ok := topic.FromContext(ctx) + if ok { + return contextTopic + } return cfg.Topic } diff --git a/exporter/kafkaexporter/kafka_exporter_test.go b/exporter/kafkaexporter/kafka_exporter_test.go index 0edc6b9a52e5..670318887703 100644 --- a/exporter/kafkaexporter/kafka_exporter_test.go +++ b/exporter/kafkaexporter/kafka_exporter_test.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic" ) func TestNewExporter_err_version(t *testing.T) { @@ -205,6 +206,22 @@ func TestTracesPusher_attr(t *testing.T) { require.NoError(t, err) } +func TestTracesPusher_ctx(t *testing.T) { + c := sarama.NewConfig() + producer := mocks.NewSyncProducer(t, c) + producer.ExpectSendMessageAndSucceed() + + p := kafkaTracesProducer{ + producer: producer, + marshaler: newPdataTracesMarshaler(&ptrace.ProtoMarshaler{}, defaultEncoding, false), + } + t.Cleanup(func() { + require.NoError(t, p.Close(context.Background())) + }) + err := p.tracesPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateTraces(2)) + require.NoError(t, err) +} + func TestTracesPusher_err(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) @@ -271,6 +288,22 @@ func TestMetricsDataPusher_attr(t *testing.T) { require.NoError(t, err) } +func TestMetricsDataPusher_ctx(t *testing.T) { + c := sarama.NewConfig() + producer := mocks.NewSyncProducer(t, c) + producer.ExpectSendMessageAndSucceed() + + p := kafkaMetricsProducer{ + producer: producer, + marshaler: newPdataMetricsMarshaler(&pmetric.ProtoMarshaler{}, defaultEncoding, false), + } + t.Cleanup(func() { + require.NoError(t, p.Close(context.Background())) + }) + err := p.metricsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateMetrics(2)) + require.NoError(t, err) +} + func TestMetricsDataPusher_err(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) @@ -337,6 +370,22 @@ func TestLogsDataPusher_attr(t *testing.T) { require.NoError(t, err) } +func TestLogsDataPusher_ctx(t *testing.T) { + c := sarama.NewConfig() + producer := mocks.NewSyncProducer(t, c) + producer.ExpectSendMessageAndSucceed() + + p := kafkaLogsProducer{ + producer: producer, + marshaler: newPdataLogsMarshaler(&plog.ProtoMarshaler{}, defaultEncoding, false), + } + t.Cleanup(func() { + require.NoError(t, p.Close(context.Background())) + }) + err := p.logsDataPusher(topic.WithTopic(context.Background(), "my_topic"), testdata.GenerateLogs(1)) + require.NoError(t, err) +} + func TestLogsDataPusher_err(t *testing.T) { c := sarama.NewConfig() producer := mocks.NewSyncProducer(t, c) @@ -410,6 +459,7 @@ func Test_GetTopic(t *testing.T) { tests := []struct { name string cfg Config + ctx context.Context resource any wantTopic string }{ @@ -419,6 +469,7 @@ func Test_GetTopic(t *testing.T) { TopicFromAttribute: "resource-attr", Topic: "defaultTopic", }, + ctx: topic.WithTopic(context.Background(), "context-topic"), resource: testdata.GenerateMetrics(1).ResourceMetrics(), wantTopic: "resource-attr-val-1", }, @@ -428,6 +479,7 @@ func Test_GetTopic(t *testing.T) { TopicFromAttribute: "resource-attr", Topic: "defaultTopic", }, + ctx: topic.WithTopic(context.Background(), "context-topic"), resource: testdata.GenerateTraces(1).ResourceSpans(), wantTopic: "resource-attr-val-1", }, @@ -437,6 +489,7 @@ func Test_GetTopic(t *testing.T) { TopicFromAttribute: "resource-attr", Topic: "defaultTopic", }, + ctx: topic.WithTopic(context.Background(), "context-topic"), resource: testdata.GenerateLogs(1).ResourceLogs(), wantTopic: "resource-attr-val-1", }, @@ -446,14 +499,58 @@ func Test_GetTopic(t *testing.T) { TopicFromAttribute: "nonexistent_attribute", Topic: "defaultTopic", }, + ctx: context.Background(), + resource: testdata.GenerateMetrics(1).ResourceMetrics(), + wantTopic: "defaultTopic", + }, + + { + name: "Valid metric context, return topic name", + cfg: Config{ + TopicFromAttribute: "nonexistent_attribute", + Topic: "defaultTopic", + }, + ctx: topic.WithTopic(context.Background(), "context-topic"), + resource: testdata.GenerateMetrics(1).ResourceMetrics(), + wantTopic: "context-topic", + }, + { + name: "Valid trace context, return topic name", + cfg: Config{ + TopicFromAttribute: "nonexistent_attribute", + Topic: "defaultTopic", + }, + ctx: topic.WithTopic(context.Background(), "context-topic"), + resource: testdata.GenerateTraces(1).ResourceSpans(), + wantTopic: "context-topic", + }, + { + name: "Valid log context, return topic name", + cfg: Config{ + TopicFromAttribute: "nonexistent_attribute", + Topic: "defaultTopic", + }, + ctx: topic.WithTopic(context.Background(), "context-topic"), + resource: testdata.GenerateLogs(1).ResourceLogs(), + wantTopic: "context-topic", + }, + + { + name: "Attribute not found", + cfg: Config{ + TopicFromAttribute: "nonexistent_attribute", + Topic: "defaultTopic", + }, + ctx: context.Background(), resource: testdata.GenerateMetrics(1).ResourceMetrics(), wantTopic: "defaultTopic", }, { - name: "TopicFromAttribute not set, return default topic", + name: "TopicFromAttribute, return default topic", cfg: Config{ Topic: "defaultTopic", }, + ctx: context.Background(), resource: testdata.GenerateMetrics(1).ResourceMetrics(), wantTopic: "defaultTopic", }, @@ -464,11 +561,11 @@ func Test_GetTopic(t *testing.T) { topic := "" switch r := tests[i].resource.(type) { case pmetric.ResourceMetricsSlice: - topic = getTopic(&tests[i].cfg, r) + topic = getTopic(tests[i].ctx, &tests[i].cfg, r) case ptrace.ResourceSpansSlice: - topic = getTopic(&tests[i].cfg, r) + topic = getTopic(tests[i].ctx, &tests[i].cfg, r) case plog.ResourceLogsSlice: - topic = getTopic(&tests[i].cfg, r) + topic = getTopic(tests[i].ctx, &tests[i].cfg, r) } assert.Equal(t, tests[i].wantTopic, topic) }) diff --git a/pkg/datadog/metadata.yaml b/pkg/datadog/metadata.yaml new file mode 100644 index 000000000000..dae3d0ab2af6 --- /dev/null +++ b/pkg/datadog/metadata.yaml @@ -0,0 +1,3 @@ +status: + codeowners: + active: [mx-psi,dineshg13, liustanley, songy23, mackjmr, ankitpatel96] \ No newline at end of file diff --git a/pkg/kafka/topic/Makefile b/pkg/kafka/topic/Makefile new file mode 100644 index 000000000000..bdd863a203be --- /dev/null +++ b/pkg/kafka/topic/Makefile @@ -0,0 +1 @@ +include ../../../Makefile.Common diff --git a/pkg/kafka/topic/README.md b/pkg/kafka/topic/README.md new file mode 100644 index 000000000000..663d969e5d61 --- /dev/null +++ b/pkg/kafka/topic/README.md @@ -0,0 +1,4 @@ +# Kafka Topic Context Accessor + +This module is used for accessing the topic within a context. +See the [kafka exporter readme](../../../exporter/kafkaexporter/README.md#destination-topic) for more details. diff --git a/pkg/kafka/topic/go.mod b/pkg/kafka/topic/go.mod new file mode 100644 index 000000000000..5c27986a6d6e --- /dev/null +++ b/pkg/kafka/topic/go.mod @@ -0,0 +1,3 @@ +module github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic + +go 1.22 diff --git a/pkg/kafka/topic/kafka_ctx.go b/pkg/kafka/topic/kafka_ctx.go new file mode 100644 index 000000000000..603863615fb4 --- /dev/null +++ b/pkg/kafka/topic/kafka_ctx.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package topic // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic" + +import ( + "context" +) + +func WithTopic(ctx context.Context, topic string) context.Context { + return context.WithValue(ctx, topicContextKey{}, topic) +} + +func FromContext(ctx context.Context) (string, bool) { + contextTopic, ok := ctx.Value(topicContextKey{}).(string) + return contextTopic, ok +} + +type topicContextKey struct{} diff --git a/pkg/kafka/topic/metadata.yaml b/pkg/kafka/topic/metadata.yaml new file mode 100644 index 000000000000..708ccce63f83 --- /dev/null +++ b/pkg/kafka/topic/metadata.yaml @@ -0,0 +1,6 @@ +type: topic + +status: + class: pkg + codeowners: + active: [pavolloffay, MovieStoreGuy] diff --git a/pkg/ottl/ottlfuncs/func_time.go b/pkg/ottl/ottlfuncs/func_time.go index b6d793cc3e5d..44371a94de6e 100644 --- a/pkg/ottl/ottlfuncs/func_time.go +++ b/pkg/ottl/ottlfuncs/func_time.go @@ -34,6 +34,11 @@ func Time[K any](inputTime ottl.StringGetter[K], format string, location ottl.Op if format == "" { return nil, fmt.Errorf("format cannot be nil") } + gotimeFormat, err := timeutils.StrptimeToGotime(format) + if err != nil { + return nil, err + } + var defaultLocation *string if !location.IsEmpty() { l := location.Get() @@ -44,7 +49,6 @@ func Time[K any](inputTime ottl.StringGetter[K], format string, location ottl.Op if err != nil { return nil, err } - return func(ctx context.Context, tCtx K) (any, error) { t, err := inputTime.Get(ctx, tCtx) if err != nil { @@ -53,7 +57,7 @@ func Time[K any](inputTime ottl.StringGetter[K], format string, location ottl.Op if t == "" { return nil, fmt.Errorf("time cannot be nil") } - timestamp, err := timeutils.ParseStrptime(format, t, loc) + timestamp, err := timeutils.ParseGotime(gotimeFormat, t, loc) if err != nil { return nil, err } diff --git a/pkg/ottl/ottlfuncs/func_time_test.go b/pkg/ottl/ottlfuncs/func_time_test.go index 41e62edaae04..cc9ce2a795f1 100644 --- a/pkg/ottl/ottlfuncs/func_time_test.go +++ b/pkg/ottl/ottlfuncs/func_time_test.go @@ -284,3 +284,194 @@ func Test_TimeFormatError(t *testing.T) { }) } } + +func Benchmark_Time(t *testing.B) { + locationAmericaNewYork, _ := time.LoadLocation("America/New_York") + locationAsiaShanghai, _ := time.LoadLocation("Asia/Shanghai") + + tests := []struct { + name string + time ottl.StringGetter[any] + format string + expected time.Time + location string + }{ + { + name: "simple short form", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2023-04-12", nil + }, + }, + format: "%Y-%m-%d", + expected: time.Date(2023, 4, 12, 0, 0, 0, 0, time.Local), + }, + { + name: "simple short form with short year and slashes", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "11/11/11", nil + }, + }, + format: "%d/%m/%y", + expected: time.Date(2011, 11, 11, 0, 0, 0, 0, time.Local), + }, + { + name: "month day year", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "02/04/2023", nil + }, + }, + format: "%m/%d/%Y", + expected: time.Date(2023, 2, 4, 0, 0, 0, 0, time.Local), + }, + { + name: "simple long form", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "July 31, 1993", nil + }, + }, + format: "%B %d, %Y", + expected: time.Date(1993, 7, 31, 0, 0, 0, 0, time.Local), + }, + { + name: "date with timestamp", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "Mar 14 2023 17:02:59", nil + }, + }, + format: "%b %d %Y %H:%M:%S", + expected: time.Date(2023, 3, 14, 17, 02, 59, 0, time.Local), + }, + { + name: "day of the week long form", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "Monday, May 01, 2023", nil + }, + }, + format: "%A, %B %d, %Y", + expected: time.Date(2023, 5, 1, 0, 0, 0, 0, time.Local), + }, + { + name: "short weekday, short month, long format", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "Sat, May 20, 2023", nil + }, + }, + format: "%a, %b %d, %Y", + expected: time.Date(2023, 5, 20, 0, 0, 0, 0, time.Local), + }, + { + name: "short months", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "Feb 15, 2023", nil + }, + }, + format: "%b %d, %Y", + expected: time.Date(2023, 2, 15, 0, 0, 0, 0, time.Local), + }, + { + name: "timestamp with time zone offset", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2023-05-26 12:34:56 HST", nil + }, + }, + format: "%Y-%m-%d %H:%M:%S %Z", + expected: time.Date(2023, 5, 26, 12, 34, 56, 0, time.FixedZone("HST", -10*60*60)), + }, + { + name: "short date with timestamp without time zone offset", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2023-05-26T12:34:56 GMT", nil + }, + }, + format: "%Y-%m-%dT%H:%M:%S %Z", + expected: time.Date(2023, 5, 26, 12, 34, 56, 0, time.FixedZone("GMT", 0)), + }, + { + name: "RFC 3339 in custom format", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2012-11-01T22:08:41+0000 EST", nil + }, + }, + format: "%Y-%m-%dT%H:%M:%S%z %Z", + expected: time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("EST", 0)), + }, + { + name: "RFC 3339 in custom format before 2000", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "1986-10-01T00:17:33 MST", nil + }, + }, + format: "%Y-%m-%dT%H:%M:%S %Z", + expected: time.Date(1986, 10, 01, 00, 17, 33, 00, time.FixedZone("MST", -7*60*60)), + }, + { + name: "no location", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2022/01/01", nil + }, + }, + format: "%Y/%m/%d", + expected: time.Date(2022, 01, 01, 0, 0, 0, 0, time.Local), + }, + { + name: "with location - America", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2023-05-26 12:34:56", nil + }, + }, + format: "%Y-%m-%d %H:%M:%S", + location: "America/New_York", + expected: time.Date(2023, 5, 26, 12, 34, 56, 0, locationAmericaNewYork), + }, + { + name: "with location - Asia", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "2023-05-26 12:34:56", nil + }, + }, + format: "%Y-%m-%d %H:%M:%S", + location: "Asia/Shanghai", + expected: time.Date(2023, 5, 26, 12, 34, 56, 0, locationAsiaShanghai), + }, + { + name: "RFC 3339 in custom format before 2000, ignore default location", + time: &ottl.StandardStringGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return "1986-10-01T00:17:33 MST", nil + }, + }, + location: "Asia/Shanghai", + format: "%Y-%m-%dT%H:%M:%S %Z", + expected: time.Date(1986, 10, 01, 00, 17, 33, 00, time.FixedZone("MST", -7*60*60)), + }, + } + for _, tt := range tests { + var locOptional ottl.Optional[string] + if tt.location != "" { + locOptional = ottl.NewTestingOptional(tt.location) + } + exprFunc, err := Time(tt.time, tt.format, locOptional) + assert.NoError(t, err) + + t.Run(tt.name, func(t *testing.B) { + result, err := exprFunc(nil, nil) + assert.NoError(t, err) + assert.Equal(t, tt.expected.UnixNano(), result.(time.Time).UnixNano()) + }) + } +} diff --git a/processor/deltatorateprocessor/processor.go b/processor/deltatorateprocessor/processor.go index 8da2382ef0ee..a9ba3aca4697 100644 --- a/processor/deltatorateprocessor/processor.go +++ b/processor/deltatorateprocessor/processor.go @@ -55,33 +55,26 @@ func (dtrp *deltaToRateProcessor) processMetrics(_ context.Context, md pmetric.M dtrp.logger.Info(fmt.Sprintf("Configured metric for rate calculation %s is not a delta sum\n", metric.Name())) continue } - newDoubleDataPointSlice := pmetric.NewNumberDataPointSlice() - dataPoints := metric.Sum().DataPoints() + dataPointSlice := metric.Sum().DataPoints() - for i := 0; i < dataPoints.Len(); i++ { - fromDataPoint := dataPoints.At(i) - newDp := newDoubleDataPointSlice.AppendEmpty() - fromDataPoint.CopyTo(newDp) + for i := 0; i < dataPointSlice.Len(); i++ { + dataPoint := dataPointSlice.At(i) - durationNanos := time.Duration(fromDataPoint.Timestamp() - fromDataPoint.StartTimestamp()) + durationNanos := time.Duration(dataPoint.Timestamp() - dataPoint.StartTimestamp()) var rate float64 - switch fromDataPoint.ValueType() { + switch dataPoint.ValueType() { case pmetric.NumberDataPointValueTypeDouble: - rate = calculateRate(fromDataPoint.DoubleValue(), durationNanos) + rate = calculateRate(dataPoint.DoubleValue(), durationNanos) case pmetric.NumberDataPointValueTypeInt: - rate = calculateRate(float64(fromDataPoint.IntValue()), durationNanos) + rate = calculateRate(float64(dataPoint.IntValue()), durationNanos) default: - return md, consumererror.NewPermanent(fmt.Errorf("invalid data point type:%d", fromDataPoint.ValueType())) + return md, consumererror.NewPermanent(fmt.Errorf("invalid data point type:%d", dataPoint.ValueType())) } - newDp.SetDoubleValue(rate) + dataPoint.SetDoubleValue(rate) } - dps := metric.SetEmptyGauge().DataPoints() - dps.EnsureCapacity(newDoubleDataPointSlice.Len()) - for d := 0; d < newDoubleDataPointSlice.Len(); d++ { - dp := dps.AppendEmpty() - newDoubleDataPointSlice.At(d).CopyTo(dp) - } + // Setting the data type removed all the data points, so we must move them back to the metric. + dataPointSlice.MoveAndAppendTo(metric.SetEmptyGauge().DataPoints()) } } } diff --git a/processor/groupbytraceprocessor/documentation.md b/processor/groupbytraceprocessor/documentation.md index 384939278508..be8bb8183fe8 100644 --- a/processor/groupbytraceprocessor/documentation.md +++ b/processor/groupbytraceprocessor/documentation.md @@ -28,7 +28,7 @@ Releases that are suspected to have been incomplete | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | -| | Sum | Int | true | +| {releases} | Sum | Int | true | ### otelcol_processor_groupbytrace_num_events_in_queue diff --git a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go index b2f4db20c319..58fe789590e6 100644 --- a/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go +++ b/processor/groupbytraceprocessor/internal/metadata/generated_telemetry.go @@ -67,7 +67,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme builder.ProcessorGroupbytraceIncompleteReleases, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_processor_groupbytrace_incomplete_releases", metric.WithDescription("Releases that are suspected to have been incomplete"), - metric.WithUnit(""), + metric.WithUnit("{releases}"), ) errs = errors.Join(errs, err) builder.ProcessorGroupbytraceNumEventsInQueue, err = builder.meters[configtelemetry.LevelBasic].Int64Gauge( diff --git a/processor/groupbytraceprocessor/metadata.yaml b/processor/groupbytraceprocessor/metadata.yaml index ef574c02edbf..0e2fab550e12 100644 --- a/processor/groupbytraceprocessor/metadata.yaml +++ b/processor/groupbytraceprocessor/metadata.yaml @@ -57,6 +57,7 @@ telemetry: processor_groupbytrace_incomplete_releases: enabled: true description: Releases that are suspected to have been incomplete + unit: "{releases}" sum: value_type: int monotonic: true diff --git a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go index 3c3a5100dc3c..3fc352f0faca 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go +++ b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum.go @@ -54,8 +54,8 @@ func convertGaugeToSum(stringAggTemp string, monotonic bool) (ottl.ExprFunc[ottl metric.SetEmptySum().SetAggregationTemporality(aggTemp) metric.Sum().SetIsMonotonic(monotonic) - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.Sum().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.Sum().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go index dab12e96d094..61c848ab1b52 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go +++ b/processor/transformprocessor/internal/metrics/func_convert_gauge_to_sum_datapoint.go @@ -50,8 +50,8 @@ func convertDatapointGaugeToSum(stringAggTemp string, monotonic bool) (ottl.Expr metric.SetEmptySum().SetAggregationTemporality(aggTemp) metric.Sum().SetIsMonotonic(monotonic) - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.Sum().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.Sum().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go index f4763e65c9e5..212395bd524b 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go +++ b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge.go @@ -30,7 +30,7 @@ func convertSumToGauge() (ottl.ExprFunc[ottlmetric.TransformContext], error) { dps := metric.Sum().DataPoints() // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.SetEmptyGauge().DataPoints()) + dps.MoveAndAppendTo(metric.SetEmptyGauge().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go index ca2f09c8a121..1943d2d9796a 100644 --- a/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go +++ b/processor/transformprocessor/internal/metrics/func_convert_sum_to_gauge_datapoint.go @@ -29,8 +29,8 @@ func convertDatapointSumToGauge() (ottl.ExprFunc[ottldatapoint.TransformContext] dps := metric.Sum().DataPoints() - // Setting the data type removed all the data points, so we must copy them back to the metric. - dps.CopyTo(metric.SetEmptyGauge().DataPoints()) + // Setting the data type removed all the data points, so we must move them back to the metric. + dps.MoveAndAppendTo(metric.SetEmptyGauge().DataPoints()) return nil, nil }, nil diff --git a/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go b/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go index 78b47623cdf3..f002260944ac 100644 --- a/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go +++ b/processor/transformprocessor/internal/metrics/func_extract_sum_metric.go @@ -32,7 +32,7 @@ func createExtractSumMetricFunction(_ ottl.FunctionContext, oArgs ottl.Arguments return extractSumMetric(args.Monotonic) } -// this interface helps unify the logic for extracting data from different histogram types +// SumCountDataPoint interface helps unify the logic for extracting data from different histogram types // all supported metric types' datapoints implement it type SumCountDataPoint interface { Attributes() pcommon.Map diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 8decd129de0a..55b676e3ccf0 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -69,6 +69,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.109.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.109.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -122,6 +123,8 @@ retract ( v0.65.0 ) +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic => ../../pkg/kafka/topic + replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest diff --git a/testbed/datasenders/syslog.go b/testbed/datasenders/syslog.go index 3417d9bbc390..24cec672a6c7 100644 --- a/testbed/datasenders/syslog.go +++ b/testbed/datasenders/syslog.go @@ -91,14 +91,20 @@ func (f *SyslogWriter) GenConfigYAMLStr() string { func (f *SyslogWriter) Send(lr plog.LogRecord) error { ts := time.Unix(int64(lr.Timestamp()/1_000_000_000), int64(lr.Timestamp()%1_000_000_000)).Format(time.RFC3339Nano) sdid := strings.Builder{} - sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "trace_id", lr.TraceID())) - sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "span_id", lr.SpanID())) + if lr.TraceID().String() != "" { + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "trace_id", lr.TraceID())) + } + if lr.SpanID().String() != "" { + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "span_id", lr.SpanID())) + } sdid.WriteString(fmt.Sprintf("%s=\"%d\" ", "trace_flags", lr.Flags())) lr.Attributes().Range(func(k string, v pcommon.Value) bool { - sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", k, v.Str())) + if v.Str() != "" { + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", k, v.Str())) + } return true }) - msg := fmt.Sprintf("<166> %s 127.0.0.1 - - - [%s] %s\n", ts, sdid.String(), lr.Body().Str()) + msg := fmt.Sprintf("<166>1 %s 127.0.0.1 - - - [test@12345 %s] %s\n", ts, strings.TrimSpace(sdid.String()), lr.Body().Str()) f.buf = append(f.buf, msg) return f.SendCheck() diff --git a/versions.yaml b/versions.yaml index 27479bf26abd..c63af9fe4b11 100644 --- a/versions.yaml +++ b/versions.yaml @@ -142,6 +142,7 @@ module-sets: - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden + - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil