From 94d0d37b0cc167a317f1c202fd02957bcae2f06d Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 16 Dec 2024 14:02:59 +0000 Subject: [PATCH 01/11] [connector/signatometrics]Add core logic for signal to metrics --- .../signaltometricsconnector/connector.go | 241 ++++++- .../connector_test.go | 323 +++++++++ connector/signaltometricsconnector/factory.go | 75 +- connector/signaltometricsconnector/go.mod | 6 +- connector/signaltometricsconnector/go.sum | 2 + .../internal/aggregator/aggregator.go | 305 ++++++++ .../internal/aggregator/exphistogram.go | 63 ++ .../internal/aggregator/explicithistogram.go | 61 ++ .../internal/aggregator/sumdp.go | 55 ++ .../internal/aggregator/valuecountdp.go | 59 ++ .../internal/model/model.go | 245 +++++++ .../logs/exponential_histograms/config.yaml | 40 ++ .../logs/exponential_histograms/output.yaml | 473 ++++++++++++ .../testdata/logs/histograms/config.yaml | 45 ++ .../testdata/logs/histograms/output.yaml | 154 ++++ .../testdata/logs/logs.yaml | 63 ++ .../testdata/logs/sum/config.yaml | 35 + .../testdata/logs/sum/output.yaml | 70 ++ .../exponential_histograms/config.yaml | 13 + .../exponential_histograms/output.yaml | 288 ++++++++ .../testdata/metrics/histograms/config.yaml | 15 + .../testdata/metrics/histograms/output.yaml | 63 ++ .../testdata/metrics/metrics.yaml | 226 ++++++ .../testdata/metrics/sum/config.yaml | 29 + .../testdata/metrics/sum/output.yaml | 54 ++ .../traces/exponential_histograms/config.yaml | 54 ++ .../traces/exponential_histograms/output.yaml | 677 ++++++++++++++++++ .../testdata/traces/histograms/config.yaml | 53 ++ .../testdata/traces/histograms/output.yaml | 254 +++++++ .../testdata/traces/sum/config.yaml | 47 ++ .../testdata/traces/sum/output.yaml | 74 ++ 31 files changed, 4138 insertions(+), 24 deletions(-) create mode 100644 connector/signaltometricsconnector/connector_test.go create mode 100644 connector/signaltometricsconnector/internal/aggregator/aggregator.go create mode 100644 connector/signaltometricsconnector/internal/aggregator/exphistogram.go create mode 100644 connector/signaltometricsconnector/internal/aggregator/explicithistogram.go create mode 100644 connector/signaltometricsconnector/internal/aggregator/sumdp.go create mode 100644 connector/signaltometricsconnector/internal/aggregator/valuecountdp.go create mode 100644 connector/signaltometricsconnector/internal/model/model.go create mode 100644 connector/signaltometricsconnector/testdata/logs/exponential_histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/exponential_histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/logs.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/sum/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/logs/sum/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/exponential_histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/exponential_histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/metrics.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/sum/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/metrics/sum/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/exponential_histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/exponential_histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/histograms/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/histograms/output.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/sum/config.yaml create mode 100644 connector/signaltometricsconnector/testdata/traces/sum/output.yaml diff --git a/connector/signaltometricsconnector/connector.go b/connector/signaltometricsconnector/connector.go index d6f5f5b8283b..fa314ece6bd8 100644 --- a/connector/signaltometricsconnector/connector.go +++ b/connector/signaltometricsconnector/connector.go @@ -5,46 +5,253 @@ package signaltometricsconnector // import "github.com/open-telemetry/openteleme import ( "context" + "errors" + "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" ) type signalToMetrics struct { next consumer.Metrics logger *zap.Logger + spanMetricDefs []model.MetricDef[ottlspan.TransformContext] + dpMetricDefs []model.MetricDef[ottldatapoint.TransformContext] + logMetricDefs []model.MetricDef[ottllog.TransformContext] + component.StartFunc component.ShutdownFunc } -func newSignalToMetrics( - set connector.Settings, - next consumer.Metrics, -) *signalToMetrics { - return &signalToMetrics{ - logger: set.Logger, - next: next, - } -} - func (sm *signalToMetrics) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: false} } -func (sm *signalToMetrics) ConsumeTraces(context.Context, ptrace.Traces) error { - return nil +func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + if len(sm.spanMetricDefs) == 0 { + return nil + } + + var multiError error + processedMetrics := pmetric.NewMetrics() + processedMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len()) + aggregator := aggregator.NewAggregator[ottlspan.TransformContext](processedMetrics) + + for i := 0; i < td.ResourceSpans().Len(); i++ { + resourceSpan := td.ResourceSpans().At(i) + resourceAttrs := resourceSpan.Resource().Attributes() + for j := 0; j < resourceSpan.ScopeSpans().Len(); j++ { + scopeSpan := resourceSpan.ScopeSpans().At(j) + for k := 0; k < scopeSpan.Spans().Len(); k++ { + span := scopeSpan.Spans().At(k) + spanAttrs := span.Attributes() + for _, md := range sm.spanMetricDefs { + filteredSpanAttrs, ok := md.FilterAttributes(spanAttrs) + if !ok { + continue + } + + // The transform context is created from orginal attributes so that the + // OTTL expressions are also applied on the original attributes. + tCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) + if md.Conditions != nil { + match, err := md.Conditions.Eval(ctx, tCtx) + if err != nil { + multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) + continue + } + if !match { + sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) + continue + } + } + + filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) + multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1)) + } + } + } + } + aggregator.Finalize(sm.spanMetricDefs) + return sm.processNext(ctx, processedMetrics, multiError) } -func (sm *signalToMetrics) ConsumeMetrics(context.Context, pmetric.Metrics) error { - return nil +func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error { + if len(sm.dpMetricDefs) == 0 { + return nil + } + + var multiError error + processedMetrics := pmetric.NewMetrics() + processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len()) + aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics) + for i := 0; i < m.ResourceMetrics().Len(); i++ { + resourceMetric := m.ResourceMetrics().At(i) + resourceAttrs := resourceMetric.Resource().Attributes() + for j := 0; j < resourceMetric.ScopeMetrics().Len(); j++ { + scopeMetric := resourceMetric.ScopeMetrics().At(j) + for k := 0; k < scopeMetric.Metrics().Len(); k++ { + metrics := scopeMetric.Metrics() + metric := metrics.At(k) + for _, md := range sm.dpMetricDefs { + filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) + aggregate := func(dp any, dpAttrs pcommon.Map) error { + // The transform context is created from orginal attributes so that the + // OTTL expressions are also applied on the original attributes. + tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric) + if md.Conditions != nil { + match, err := md.Conditions.Eval(ctx, tCtx) + if err != nil { + multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) + return nil + } + if !match { + sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) + return nil + } + } + return aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, dpAttrs, 1) + } + + //exhaustive:enforce + switch metric.Type() { + case pmetric.MetricTypeGauge: + dps := metric.Gauge().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) + if !ok { + continue + } + multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + } + case pmetric.MetricTypeSum: + dps := metric.Sum().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) + if !ok { + continue + } + multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + } + case pmetric.MetricTypeSummary: + dps := metric.Summary().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) + if !ok { + continue + } + multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + } + case pmetric.MetricTypeHistogram: + dps := metric.Histogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) + if !ok { + continue + } + multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + } + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dp := dps.At(l) + filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes()) + if !ok { + continue + } + multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + } + case pmetric.MetricTypeEmpty: + multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type())) + } + } + } + } + } + aggregator.Finalize(sm.dpMetricDefs) + return sm.processNext(ctx, processedMetrics, multiError) } -func (sm *signalToMetrics) ConsumeLogs(context.Context, plog.Logs) error { - return nil +func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) error { + if len(sm.logMetricDefs) == 0 { + return nil + } + + var multiError error + processedMetrics := pmetric.NewMetrics() + processedMetrics.ResourceMetrics().EnsureCapacity(logs.ResourceLogs().Len()) + aggregator := aggregator.NewAggregator[ottllog.TransformContext](processedMetrics) + for i := 0; i < logs.ResourceLogs().Len(); i++ { + resourceLog := logs.ResourceLogs().At(i) + resourceAttrs := resourceLog.Resource().Attributes() + for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { + scopeLog := resourceLog.ScopeLogs().At(j) + for k := 0; k < scopeLog.LogRecords().Len(); k++ { + log := scopeLog.LogRecords().At(k) + logAttrs := log.Attributes() + for _, md := range sm.logMetricDefs { + filteredLogAttrs, ok := md.FilterAttributes(logAttrs) + if !ok { + continue + } + + // The transform context is created from orginal attributes so that the + // OTTL expressions are also applied on the original attributes. + tCtx := ottllog.NewTransformContext(log, scopeLog.Scope(), resourceLog.Resource(), scopeLog, resourceLog) + if md.Conditions != nil { + match, err := md.Conditions.Eval(ctx, tCtx) + if err != nil { + multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) + continue + } + if !match { + sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) + continue + } + } + + filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) + multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1)) + } + } + } + } + aggregator.Finalize(sm.logMetricDefs) + return sm.processNext(ctx, processedMetrics, multiError) +} + +// processNext is a helper method for all the Consume* methods to do error handling, +// logging, and sending the processed metrics to the next consumer in the pipeline. +func (sm *signalToMetrics) processNext(ctx context.Context, m pmetric.Metrics, err error) error { + if err != nil { + dpCount := m.DataPointCount() + if dpCount == 0 { + // No signals were consumed so return an error + return fmt.Errorf("failed to consume signal: %w", err) + } + // At least some signals were partially consumed, so log the error + // and pass the processed metrics to the next consumer. + sm.logger.Warn( + "failed to consume all signals, some signals were partially processed", + zap.Error(err), + zap.Int("successful_data_points", dpCount), + ) + } + return sm.next.ConsumeMetrics(ctx, m) } diff --git a/connector/signaltometricsconnector/connector_test.go b/connector/signaltometricsconnector/connector_test.go new file mode 100644 index 000000000000..a7a7a3046c42 --- /dev/null +++ b/connector/signaltometricsconnector/connector_test.go @@ -0,0 +1,323 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package signaltometricsconnector + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/confmap/confmaptest" + "go.opentelemetry.io/collector/connector" + "go.opentelemetry.io/collector/connector/connectortest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.26.0" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" +) + +const testDataDir = "testdata" + +func TestConnectorWithTraces(t *testing.T) { + testCases := []string{ + "sum", + "histograms", + "exponential_histograms", + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, tc := range testCases { + t.Run(tc, func(t *testing.T) { + traceTestDataDir := filepath.Join(testDataDir, "traces") + inputTraces, err := golden.ReadTraces(filepath.Join(traceTestDataDir, "traces.yaml")) + require.NoError(t, err) + + next := &consumertest.MetricsSink{} + tcTestDataDir := filepath.Join(traceTestDataDir, tc) + factory, settings, cfg := setupConnector(t, tcTestDataDir) + connector, err := factory.CreateTracesToMetrics(ctx, settings, cfg, next) + require.NoError(t, err) + require.IsType(t, &signalToMetrics{}, connector) + expectedMetrics, err := golden.ReadMetrics(filepath.Join(tcTestDataDir, "output.yaml")) + require.NoError(t, err) + + require.NoError(t, connector.ConsumeTraces(ctx, inputTraces)) + require.Len(t, next.AllMetrics(), 1) + assertAggregatedMetrics(t, expectedMetrics, next.AllMetrics()[0]) + }) + } +} + +func TestConnectorWithMetrics(t *testing.T) { + testCases := []string{ + "sum", + "histograms", + "exponential_histograms", + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, tc := range testCases { + t.Run(tc, func(t *testing.T) { + metricTestDataDir := filepath.Join(testDataDir, "metrics") + inputMetrics, err := golden.ReadMetrics(filepath.Join(metricTestDataDir, "metrics.yaml")) + require.NoError(t, err) + + next := &consumertest.MetricsSink{} + tcTestDataDir := filepath.Join(metricTestDataDir, tc) + factory, settings, cfg := setupConnector(t, tcTestDataDir) + connector, err := factory.CreateMetricsToMetrics(ctx, settings, cfg, next) + require.NoError(t, err) + require.IsType(t, &signalToMetrics{}, connector) + expectedMetrics, err := golden.ReadMetrics(filepath.Join(tcTestDataDir, "output.yaml")) + require.NoError(t, err) + + require.NoError(t, connector.ConsumeMetrics(ctx, inputMetrics)) + require.Len(t, next.AllMetrics(), 1) + assertAggregatedMetrics(t, expectedMetrics, next.AllMetrics()[0]) + }) + } +} + +func TestConnectorWithLogs(t *testing.T) { + testCases := []string{ + "sum", + "histograms", + "exponential_histograms", + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for _, tc := range testCases { + t.Run(tc, func(t *testing.T) { + logTestDataDir := filepath.Join(testDataDir, "logs") + inputLogs, err := golden.ReadLogs(filepath.Join(logTestDataDir, "logs.yaml")) + require.NoError(t, err) + + next := &consumertest.MetricsSink{} + tcTestDataDir := filepath.Join(logTestDataDir, tc) + factory, settings, cfg := setupConnector(t, tcTestDataDir) + connector, err := factory.CreateLogsToMetrics(ctx, settings, cfg, next) + require.NoError(t, err) + require.IsType(t, &signalToMetrics{}, connector) + expectedMetrics, err := golden.ReadMetrics(filepath.Join(tcTestDataDir, "output.yaml")) + require.NoError(t, err) + + require.NoError(t, connector.ConsumeLogs(ctx, inputLogs)) + require.Len(t, next.AllMetrics(), 1) + assertAggregatedMetrics(t, expectedMetrics, next.AllMetrics()[0]) + }) + } +} + +func BenchmarkConnectorWithTraces(b *testing.B) { + factory := NewFactory() + settings := connectortest.NewNopSettings() + settings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zapcore.DebugLevel)) + next, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + return nil + }) + require.NoError(b, err) + + cfg := &config.Config{Spans: testMetricInfo(b)} + require.NoError(b, cfg.Unmarshal(confmap.New())) // set required fields to default + require.NoError(b, cfg.Validate()) + connector, err := factory.CreateTracesToMetrics(context.Background(), settings, cfg, next) + require.NoError(b, err) + inputTraces, err := golden.ReadTraces("testdata/traces/traces.yaml") + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := connector.ConsumeTraces(context.Background(), inputTraces); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkConnectorWithMetrics(b *testing.B) { + factory := NewFactory() + settings := connectortest.NewNopSettings() + settings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zapcore.DebugLevel)) + next, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + return nil + }) + require.NoError(b, err) + + cfg := &config.Config{Datapoints: testMetricInfo(b)} + require.NoError(b, cfg.Unmarshal(confmap.New())) // set required fields to default + require.NoError(b, cfg.Validate()) + connector, err := factory.CreateMetricsToMetrics(context.Background(), settings, cfg, next) + require.NoError(b, err) + inputMetrics, err := golden.ReadMetrics("testdata/metrics/metrics.yaml") + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := connector.ConsumeMetrics(context.Background(), inputMetrics); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkConnectorWithLogs(b *testing.B) { + factory := NewFactory() + settings := connectortest.NewNopSettings() + settings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zapcore.DebugLevel)) + next, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + return nil + }) + require.NoError(b, err) + + cfg := &config.Config{Logs: testMetricInfo(b)} + require.NoError(b, cfg.Unmarshal(confmap.New())) // set required fields to default + require.NoError(b, cfg.Validate()) + connector, err := factory.CreateLogsToMetrics(context.Background(), settings, cfg, next) + require.NoError(b, err) + inputLogs, err := golden.ReadLogs("testdata/logs/logs.yaml") + require.NoError(b, err) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if err := connector.ConsumeLogs(context.Background(), inputLogs); err != nil { + b.Fatal(err) + } + } +} + +// testMetricInfo creates a metric info with all metric types that could be used +// for all the supported signals. To do this, it uses common OTTL funcs and literals. +func testMetricInfo(b *testing.B) []config.MetricInfo { + b.Helper() + + return []config.MetricInfo{ + { + Name: "test.histogram", + Description: "Test histogram", + Unit: "ms", + IncludeResourceAttributes: []config.Attribute{ + { + Key: "resource.foo", + }, + { + Key: "404.attribute", + DefaultValue: "test_404_attribute", + }, + }, + Attributes: []config.Attribute{ + { + Key: "http.response.status_code", + }, + }, + Histogram: &config.Histogram{ + Buckets: []float64{2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000}, + Value: "1.4", + }, + }, + { + Name: "test.exphistogram", + Description: "Test exponential histogram", + Unit: "ms", + IncludeResourceAttributes: []config.Attribute{ + { + Key: "resource.foo", + }, + { + Key: "404.attribute", + DefaultValue: "test_404_attribute", + }, + }, + Attributes: []config.Attribute{ + { + Key: "http.response.status_code", + }, + }, + ExponentialHistogram: &config.ExponentialHistogram{ + Value: "2.4", + MaxSize: 160, + }, + }, + { + Name: "test.sum", + Description: "Test sum", + Unit: "ms", + IncludeResourceAttributes: []config.Attribute{ + { + Key: "resource.foo", + }, + { + Key: "404.attribute", + DefaultValue: "test_404_attribute", + }, + }, + Attributes: []config.Attribute{ + { + Key: "http.response.status_code", + }, + }, + Sum: &config.Sum{ + Value: "5.4", + }, + }, + } +} + +func setupConnector( + t *testing.T, testFilePath string, +) (connector.Factory, connector.Settings, component.Config) { + t.Helper() + factory := NewFactory() + settings := connectortest.NewNopSettings() + telemetryResource(t).CopyTo(settings.TelemetrySettings.Resource) + settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)) + + cfg := createDefaultConfig() + cm, err := confmaptest.LoadConf(filepath.Join(testFilePath, "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "").String()) + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(&cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + return factory, settings, cfg +} + +func telemetryResource(t *testing.T) pcommon.Resource { + t.Helper() + + r := pcommon.NewResource() + r.Attributes().PutStr(semconv.AttributeServiceInstanceID, "627cc493-f310-47de-96bd-71410b7dec09") + r.Attributes().PutStr(semconv.AttributeServiceName, "signaltometrics") + r.Attributes().PutStr(semconv.AttributeServiceNamespace, "test") + return r +} + +func assertAggregatedMetrics(t *testing.T, expected, actual pmetric.Metrics) bool { + t.Helper() + return assert.NoError(t, pmetrictest.CompareMetrics( + expected, actual, + pmetrictest.IgnoreMetricDataPointsOrder(), + pmetrictest.IgnoreMetricsOrder(), + pmetrictest.IgnoreTimestamp(), + )) +} diff --git a/connector/signaltometricsconnector/factory.go b/connector/signaltometricsconnector/factory.go index 4bf1fcb8d1a3..a31a3e7b50da 100644 --- a/connector/signaltometricsconnector/factory.go +++ b/connector/signaltometricsconnector/factory.go @@ -5,13 +5,19 @@ package signaltometricsconnector // import "github.com/open-telemetry/openteleme import ( "context" + "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/customottl" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspan" ) // NewFactory returns a ConnectorFactory. @@ -32,26 +38,83 @@ func createDefaultConfig() component.Config { func createTracesToMetrics( _ context.Context, set connector.Settings, - _ component.Config, + cfg component.Config, nextConsumer consumer.Metrics, ) (connector.Traces, error) { - return newSignalToMetrics(set, nextConsumer), nil + c := cfg.(*config.Config) + parser, err := ottlspan.NewParser(customottl.SpanFuncs(), set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("failed to create OTTL statement parser for datapoints: %w", err) + } + + metricDefs := make([]model.MetricDef[ottlspan.TransformContext], 0, len(c.Spans)) + for _, info := range c.Spans { + var md model.MetricDef[ottlspan.TransformContext] + if err := md.FromMetricInfo(info, parser, set.TelemetrySettings); err != nil { + return nil, fmt.Errorf("failed to parse provided metric information; %w", err) + } + metricDefs = append(metricDefs, md) + } + + return &signalToMetrics{ + logger: set.Logger, + next: nextConsumer, + spanMetricDefs: metricDefs, + }, nil } func createMetricsToMetrics( _ context.Context, set connector.Settings, - _ component.Config, + cfg component.Config, nextConsumer consumer.Metrics, ) (connector.Metrics, error) { - return newSignalToMetrics(set, nextConsumer), nil + c := cfg.(*config.Config) + parser, err := ottldatapoint.NewParser(customottl.DatapointFuncs(), set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("failed to create OTTL statement parser for datapoints: %w", err) + } + + metricDefs := make([]model.MetricDef[ottldatapoint.TransformContext], 0, len(c.Datapoints)) + for _, info := range c.Datapoints { + var md model.MetricDef[ottldatapoint.TransformContext] + if err := md.FromMetricInfo(info, parser, set.TelemetrySettings); err != nil { + return nil, fmt.Errorf("failed to parse provided metric information; %w", err) + } + metricDefs = append(metricDefs, md) + } + + return &signalToMetrics{ + logger: set.Logger, + next: nextConsumer, + dpMetricDefs: metricDefs, + }, nil } func createLogsToMetrics( _ context.Context, set connector.Settings, - _ component.Config, + cfg component.Config, nextConsumer consumer.Metrics, ) (connector.Logs, error) { - return newSignalToMetrics(set, nextConsumer), nil + c := cfg.(*config.Config) + parser, err := ottllog.NewParser(customottl.LogFuncs(), set.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("failed to create OTTL statement parser for datapoints: %w", err) + } + + metricDefs := make([]model.MetricDef[ottllog.TransformContext], 0, len(c.Logs)) + for _, info := range c.Logs { + var md model.MetricDef[ottllog.TransformContext] + if err := md.FromMetricInfo(info, parser, set.TelemetrySettings); err != nil { + return nil, fmt.Errorf("failed to parse provided metric information; %w", err) + } + metricDefs = append(metricDefs, md) + } + + return &signalToMetrics{ + logger: set.Logger, + next: nextConsumer, + logMetricDefs: metricDefs, + }, nil } diff --git a/connector/signaltometricsconnector/go.mod b/connector/signaltometricsconnector/go.mod index 250acc98656f..452a144f9247 100644 --- a/connector/signaltometricsconnector/go.mod +++ b/connector/signaltometricsconnector/go.mod @@ -4,7 +4,10 @@ go 1.22.0 require ( github.com/lightstep/go-expohisto v1.0.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.115.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.115.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.115.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 @@ -16,6 +19,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.115.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/pdata v1.21.1-0.20241206185113-3f3e208e71b8 go.opentelemetry.io/collector/pipeline v0.115.1-0.20241206185113-3f3e208e71b8 + go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -24,6 +28,7 @@ require ( github.com/alecthomas/participle/v2 v2.1.1 // indirect github.com/antchfx/xmlquery v1.4.2 // indirect github.com/antchfx/xpath v1.3.2 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/elastic/go-grok v0.3.1 // indirect github.com/elastic/lunes v0.1.0 // indirect @@ -55,7 +60,6 @@ require ( go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.115.1-0.20241206185113-3f3e208e71b8 // indirect - go.opentelemetry.io/collector/semconv v0.115.1-0.20241206185113-3f3e208e71b8 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect diff --git a/connector/signaltometricsconnector/go.sum b/connector/signaltometricsconnector/go.sum index 87c044bc0c9e..25c153de5259 100644 --- a/connector/signaltometricsconnector/go.sum +++ b/connector/signaltometricsconnector/go.sum @@ -8,6 +8,8 @@ github.com/antchfx/xmlquery v1.4.2 h1:MZKd9+wblwxfQ1zd1AdrTsqVaMjMCwow3IqkCSe00K github.com/antchfx/xmlquery v1.4.2/go.mod h1:QXhvf5ldTuGqhd1SHNvvtlhhdQLks4dD0awIVhXIDTA= github.com/antchfx/xpath v1.3.2 h1:LNjzlsSjinu3bQpw9hWMY9ocB80oLOWuQqFvO6xt51U= github.com/antchfx/xpath v1.3.2/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/connector/signaltometricsconnector/internal/aggregator/aggregator.go b/connector/signaltometricsconnector/internal/aggregator/aggregator.go new file mode 100644 index 000000000000..a633811ffb93 --- /dev/null +++ b/connector/signaltometricsconnector/internal/aggregator/aggregator.go @@ -0,0 +1,305 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil" +) + +// Aggregator provides a single interface to update all metrics +// datastructures. The required datastructure is selected using +// the metric definition. +type Aggregator[K any] struct { + result pmetric.Metrics + // smLookup maps resourceID against scope metrics since the aggregator + // always produces a single scope. + smLookup map[[16]byte]pmetric.ScopeMetrics + valueCounts map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP + sums map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP + timestamp time.Time +} + +// NewAggregator creates a new instance of aggregator. +func NewAggregator[K any](metrics pmetric.Metrics) *Aggregator[K] { + return &Aggregator[K]{ + result: metrics, + smLookup: make(map[[16]byte]pmetric.ScopeMetrics), + valueCounts: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*valueCountDP), + sums: make(map[model.MetricKey]map[[16]byte]map[[16]byte]*sumDP), + timestamp: time.Now(), + } +} + +func (a *Aggregator[K]) Aggregate( + ctx context.Context, + tCtx K, + md model.MetricDef[K], + resAttrs, srcAttrs pcommon.Map, + defaultCount int64, +) error { + switch { + case md.ExponentialHistogram != nil: + val, count, err := getValueCount( + ctx, tCtx, + md.ExponentialHistogram.Value, + md.ExponentialHistogram.Count, + defaultCount, + ) + if err != nil { + return err + } + return a.aggregateValueCount(md, resAttrs, srcAttrs, val, count) + case md.ExplicitHistogram != nil: + val, count, err := getValueCount( + ctx, tCtx, + md.ExplicitHistogram.Value, + md.ExplicitHistogram.Count, + defaultCount, + ) + if err != nil { + return err + } + return a.aggregateValueCount(md, resAttrs, srcAttrs, val, count) + case md.Sum != nil: + raw, _, err := md.Sum.Value.Execute(ctx, tCtx) + if err != nil { + return fmt.Errorf("failed to execute OTTL value for sum: %w", err) + } + switch v := raw.(type) { + case int64: + return a.aggregateInt(md, resAttrs, srcAttrs, v) + case float64: + return a.aggregateDouble(md, resAttrs, srcAttrs, v) + default: + return fmt.Errorf( + "failed to parse sum OTTL value of type %T into int64 or float64: %v", + v, v, + ) + } + } + return nil +} + +// Finalize finalizes the aggregations performed by the aggregator so far into +// the pmetric.Metrics used to create this instance of the aggregator. Finalize +// should be called once per aggregator instance and the aggregator instance +// should not be used after Finalize is called. +func (a *Aggregator[K]) Finalize(mds []model.MetricDef[K]) { + for _, md := range mds { + for resID, dpMap := range a.valueCounts[md.Key] { + metrics := a.smLookup[resID].Metrics() + var ( + destExpHist pmetric.ExponentialHistogram + destExplicitHist pmetric.Histogram + ) + switch { + case md.ExponentialHistogram != nil: + destMetric := metrics.AppendEmpty() + destMetric.SetName(md.Key.Name) + destMetric.SetDescription(md.Key.Description) + destMetric.SetUnit(md.Unit) + destExpHist = destMetric.SetEmptyExponentialHistogram() + destExpHist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + destExpHist.DataPoints().EnsureCapacity(len(dpMap)) + case md.ExplicitHistogram != nil: + destMetric := metrics.AppendEmpty() + destMetric.SetName(md.Key.Name) + destMetric.SetDescription(md.Key.Description) + destMetric.SetUnit(md.Unit) + destExplicitHist = destMetric.SetEmptyHistogram() + destExplicitHist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + destExplicitHist.DataPoints().EnsureCapacity(len(dpMap)) + } + for _, dp := range dpMap { + dp.Copy( + a.timestamp, + destExpHist, + destExplicitHist, + ) + } + } + for resID, dpMap := range a.sums[md.Key] { + if md.Sum == nil { + continue + } + metrics := a.smLookup[resID].Metrics() + destMetric := metrics.AppendEmpty() + destMetric.SetName(md.Key.Name) + destMetric.SetDescription(md.Key.Description) + destMetric.SetUnit(md.Unit) + destCounter := destMetric.SetEmptySum() + destCounter.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + destCounter.DataPoints().EnsureCapacity(len(dpMap)) + for _, dp := range dpMap { + dp.Copy(a.timestamp, destCounter.DataPoints().AppendEmpty()) + } + } + // If there are two metric defined with the same key required by metricKey + // then they will be aggregated within the same metric and produced + // together. Deleting the key ensures this while preventing duplicates. + delete(a.valueCounts, md.Key) + delete(a.sums, md.Key) + } +} + +func (a *Aggregator[K]) aggregateInt( + md model.MetricDef[K], + resAttrs, srcAttrs pcommon.Map, + v int64, +) error { + resID := a.getResourceID(resAttrs) + attrID := pdatautil.MapHash(srcAttrs) + if _, ok := a.sums[md.Key]; !ok { + a.sums[md.Key] = make(map[[16]byte]map[[16]byte]*sumDP) + } + if _, ok := a.sums[md.Key][resID]; !ok { + a.sums[md.Key][resID] = make(map[[16]byte]*sumDP) + } + if _, ok := a.sums[md.Key][resID][attrID]; !ok { + a.sums[md.Key][resID][attrID] = newSumDP(srcAttrs, false) + } + a.sums[md.Key][resID][attrID].AggregateInt(v) + return nil +} + +func (a *Aggregator[K]) aggregateDouble( + md model.MetricDef[K], + resAttrs, srcAttrs pcommon.Map, + v float64, +) error { + resID := a.getResourceID(resAttrs) + attrID := pdatautil.MapHash(srcAttrs) + if _, ok := a.sums[md.Key]; !ok { + a.sums[md.Key] = make(map[[16]byte]map[[16]byte]*sumDP) + } + if _, ok := a.sums[md.Key][resID]; !ok { + a.sums[md.Key][resID] = make(map[[16]byte]*sumDP) + } + if _, ok := a.sums[md.Key][resID][attrID]; !ok { + a.sums[md.Key][resID][attrID] = newSumDP(srcAttrs, true) + } + a.sums[md.Key][resID][attrID].AggregateDouble(v) + return nil +} + +func (a *Aggregator[K]) aggregateValueCount( + md model.MetricDef[K], + resAttrs, srcAttrs pcommon.Map, + value float64, count int64, +) error { + if count == 0 { + // Nothing to record as count is zero + return nil + } + resID := a.getResourceID(resAttrs) + attrID := pdatautil.MapHash(srcAttrs) + if _, ok := a.valueCounts[md.Key]; !ok { + a.valueCounts[md.Key] = make(map[[16]byte]map[[16]byte]*valueCountDP) + } + if _, ok := a.valueCounts[md.Key][resID]; !ok { + a.valueCounts[md.Key][resID] = make(map[[16]byte]*valueCountDP) + } + if _, ok := a.valueCounts[md.Key][resID][attrID]; !ok { + a.valueCounts[md.Key][resID][attrID] = newValueCountDP(md, srcAttrs) + } + a.valueCounts[md.Key][resID][attrID].Aggregate(value, count) + return nil +} + +func (a *Aggregator[K]) getResourceID(resourceAttrs pcommon.Map) [16]byte { + resID := pdatautil.MapHash(resourceAttrs) + if _, ok := a.smLookup[resID]; !ok { + destResourceMetric := a.result.ResourceMetrics().AppendEmpty() + destResAttrs := destResourceMetric.Resource().Attributes() + destResAttrs.EnsureCapacity(resourceAttrs.Len() + 1) + resourceAttrs.CopyTo(destResAttrs) + destScopeMetric := destResourceMetric.ScopeMetrics().AppendEmpty() + destScopeMetric.Scope().SetName(metadata.ScopeName) + a.smLookup[resID] = destScopeMetric + } + return resID +} + +// getValueCount evaluates OTTL to get count and value respectively. Count is +// optional and defaults to the default count if the OTTL statement for count +// is missing. Value is required and returns an error if OTTL statement for +// value is missing. +func getValueCount[K any]( + ctx context.Context, tCtx K, + valueExpr, countExpr *ottl.Statement[K], + defaultCount int64, +) (float64, int64, error) { + val, err := getDoubleFromOTTL(ctx, tCtx, valueExpr) + if err != nil { + return 0, 0, fmt.Errorf("failed to get value from OTTL: %w", err) + } + count := defaultCount + if countExpr != nil { + count, err = getIntFromOTTL(ctx, tCtx, countExpr) + if err != nil { + return 0, 0, fmt.Errorf("failed to get count from OTTL: %w", err) + } + } + return val, count, nil +} + +func getIntFromOTTL[K any]( + ctx context.Context, + tCtx K, + s *ottl.Statement[K], +) (int64, error) { + if s == nil { + return 0, nil + } + raw, _, err := s.Execute(ctx, tCtx) + if err != nil { + return 0, err + } + switch v := raw.(type) { + case int64: + return v, nil + case float64: + return int64(v), nil + default: + return 0, fmt.Errorf( + "failed to parse int OTTL value, expression returned value of type %T: %v", + v, v, + ) + } +} + +func getDoubleFromOTTL[K any]( + ctx context.Context, + tCtx K, + s *ottl.Statement[K], +) (float64, error) { + if s == nil { + return 0, nil + } + raw, _, err := s.Execute(ctx, tCtx) + if err != nil { + return 0, err + } + switch v := raw.(type) { + case float64: + return v, nil + case int64: + return float64(v), nil + default: + return 0, fmt.Errorf( + "failed to parse double OTTL value, expression returned value of type %T: %v", + v, v, + ) + } +} diff --git a/connector/signaltometricsconnector/internal/aggregator/exphistogram.go b/connector/signaltometricsconnector/internal/aggregator/exphistogram.go new file mode 100644 index 000000000000..f1728ff20874 --- /dev/null +++ b/connector/signaltometricsconnector/internal/aggregator/exphistogram.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" + +import ( + "time" + + "github.com/lightstep/go-expohisto/structure" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type exponentialHistogramDP struct { + attrs pcommon.Map + data *structure.Histogram[float64] +} + +func newExponentialHistogramDP(attrs pcommon.Map, maxSize int32) *exponentialHistogramDP { + return &exponentialHistogramDP{ + attrs: attrs, + data: structure.NewFloat64( + structure.NewConfig(structure.WithMaxSize(maxSize)), + ), + } +} + +func (dp *exponentialHistogramDP) Aggregate(value float64, count int64) { + dp.data.UpdateByIncr(value, uint64(count)) +} + +func (dp *exponentialHistogramDP) Copy( + timestamp time.Time, + dest pmetric.ExponentialHistogramDataPoint, +) { + dp.attrs.CopyTo(dest.Attributes()) + dest.SetZeroCount(dp.data.ZeroCount()) + dest.SetScale(dp.data.Scale()) + dest.SetCount(dp.data.Count()) + dest.SetSum(dp.data.Sum()) + if dp.data.Count() > 0 { + dest.SetMin(dp.data.Min()) + dest.SetMax(dp.data.Max()) + } + // TODO determine appropriate start time + dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + + copyBucketRange(dp.data.Positive(), dest.Positive()) + copyBucketRange(dp.data.Negative(), dest.Negative()) +} + +// copyBucketRange copies a bucket range from exponential histogram +// datastructure to the OTel representation. +func copyBucketRange( + src *structure.Buckets, + dest pmetric.ExponentialHistogramDataPointBuckets, +) { + dest.SetOffset(src.Offset()) + dest.BucketCounts().EnsureCapacity(int(src.Len())) + for i := uint32(0); i < src.Len(); i++ { + dest.BucketCounts().Append(src.At(i)) + } +} diff --git a/connector/signaltometricsconnector/internal/aggregator/explicithistogram.go b/connector/signaltometricsconnector/internal/aggregator/explicithistogram.go new file mode 100644 index 000000000000..23ce09c5f6ca --- /dev/null +++ b/connector/signaltometricsconnector/internal/aggregator/explicithistogram.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" + +import ( + "sort" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +type explicitHistogramDP struct { + attrs pcommon.Map + + sum float64 + count uint64 + + // bounds represents the explicitly defined boundaries for the histogram + // bucket. The boundaries for a bucket at index i are: + // + // (-Inf, bounds[i]] for i == 0 + // (bounds[i-1], bounds[i]] for 0 < i < len(bounds) + // (bounds[i-1], +Inf) for i == len(bounds) + // + // Based on above representation, a bounds of length n represents n+1 buckets. + bounds []float64 + + // counts represents the count values of histogram for each bucket. The sum of + // counts across all buckets must be equal to the count variable. The length of + // counts must be one greather than the length of bounds slice. + counts []uint64 +} + +func newExplicitHistogramDP(attrs pcommon.Map, bounds []float64) *explicitHistogramDP { + return &explicitHistogramDP{ + attrs: attrs, + bounds: bounds, + counts: make([]uint64, len(bounds)+1), + } +} + +func (dp *explicitHistogramDP) Aggregate(value float64, count int64) { + dp.sum += value * float64(count) + dp.count += uint64(count) + dp.counts[sort.SearchFloat64s(dp.bounds, value)] += uint64(count) +} + +func (dp *explicitHistogramDP) Copy( + timestamp time.Time, + dest pmetric.HistogramDataPoint, +) { + dp.attrs.CopyTo(dest.Attributes()) + dest.ExplicitBounds().FromRaw(dp.bounds) + dest.BucketCounts().FromRaw(dp.counts) + dest.SetCount(dp.count) + dest.SetSum(dp.sum) + // TODO determine appropriate start time + dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) +} diff --git a/connector/signaltometricsconnector/internal/aggregator/sumdp.go b/connector/signaltometricsconnector/internal/aggregator/sumdp.go new file mode 100644 index 000000000000..332e5249bcc0 --- /dev/null +++ b/connector/signaltometricsconnector/internal/aggregator/sumdp.go @@ -0,0 +1,55 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" + +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// sumDP counts the number of events (supports all event types) +type sumDP struct { + attrs pcommon.Map + + isDbl bool + intVal int64 + dblVal float64 +} + +func newSumDP(attrs pcommon.Map, isDbl bool) *sumDP { + return &sumDP{ + isDbl: isDbl, + attrs: attrs, + } +} + +func (dp *sumDP) AggregateInt(v int64) { + if dp.isDbl { + panic("unexpected usage of sum datapoint, only integer value expected") + } + dp.intVal += v +} + +func (dp *sumDP) AggregateDouble(v float64) { + if !dp.isDbl { + panic("unexpected usage of sum datapoint, only double value expected") + } + dp.dblVal += v +} + +func (dp *sumDP) Copy( + timestamp time.Time, + dest pmetric.NumberDataPoint, +) { + dp.attrs.CopyTo(dest.Attributes()) + if dp.isDbl { + dest.SetDoubleValue(dp.dblVal) + } else { + dest.SetIntValue(dp.intVal) + } + // TODO determine appropriate start time + dest.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) +} diff --git a/connector/signaltometricsconnector/internal/aggregator/valuecountdp.go b/connector/signaltometricsconnector/internal/aggregator/valuecountdp.go new file mode 100644 index 000000000000..09a342390d92 --- /dev/null +++ b/connector/signaltometricsconnector/internal/aggregator/valuecountdp.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package aggregator // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/aggregator" +import ( + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" +) + +// valueCountDP is a wrapper DP to aggregate all datapoints that record +// value and count. +type valueCountDP struct { + expHistogramDP *exponentialHistogramDP + explicitHistogramDP *explicitHistogramDP +} + +func newValueCountDP[K any]( + md model.MetricDef[K], + attrs pcommon.Map, +) *valueCountDP { + var dp valueCountDP + if md.ExponentialHistogram != nil { + dp.expHistogramDP = newExponentialHistogramDP( + attrs, md.ExponentialHistogram.MaxSize, + ) + } + if md.ExplicitHistogram != nil { + dp.explicitHistogramDP = newExplicitHistogramDP( + attrs, md.ExplicitHistogram.Buckets, + ) + } + return &dp +} + +func (dp *valueCountDP) Aggregate(value float64, count int64) { + if dp.expHistogramDP != nil { + dp.expHistogramDP.Aggregate(value, count) + } + if dp.explicitHistogramDP != nil { + dp.explicitHistogramDP.Aggregate(value, count) + } +} + +func (dp *valueCountDP) Copy( + timestamp time.Time, + destExpHist pmetric.ExponentialHistogram, + destExplicitHist pmetric.Histogram, +) { + if dp.expHistogramDP != nil { + dp.expHistogramDP.Copy(timestamp, destExpHist.DataPoints().AppendEmpty()) + } + if dp.explicitHistogramDP != nil { + dp.explicitHistogramDP.Copy(timestamp, destExplicitHist.DataPoints().AppendEmpty()) + } +} diff --git a/connector/signaltometricsconnector/internal/model/model.go b/connector/signaltometricsconnector/internal/model/model.go new file mode 100644 index 000000000000..16d05b129eb5 --- /dev/null +++ b/connector/signaltometricsconnector/internal/model/model.go @@ -0,0 +1,245 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package model // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/model" + +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/internal/customottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" +) + +type AttributeKeyValue struct { + Key string + DefaultValue pcommon.Value +} + +type MetricKey struct { + Name string + Description string +} + +type ExplicitHistogram[K any] struct { + Buckets []float64 + Count *ottl.Statement[K] + Value *ottl.Statement[K] +} + +func (h *ExplicitHistogram[K]) fromConfig( + mi *config.Histogram, + parser ottl.Parser[K], +) error { + if mi == nil { + return nil + } + + var err error + h.Buckets = mi.Buckets + if mi.Count != "" { + h.Count, err = parser.ParseStatement(customottl.ConvertToStatement(mi.Count)) + if err != nil { + return fmt.Errorf("failed to parse count statement for explicit histogram: %w", err) + } + } + h.Value, err = parser.ParseStatement(customottl.ConvertToStatement(mi.Value)) + if err != nil { + return fmt.Errorf("failed to parse value statement for explicit histogram: %w", err) + } + return nil +} + +type ExponentialHistogram[K any] struct { + MaxSize int32 + Count *ottl.Statement[K] + Value *ottl.Statement[K] +} + +func (h *ExponentialHistogram[K]) fromConfig( + mi *config.ExponentialHistogram, + parser ottl.Parser[K], +) error { + if mi == nil { + return nil + } + + var err error + h.MaxSize = mi.MaxSize + if mi.Count != "" { + h.Count, err = parser.ParseStatement(customottl.ConvertToStatement(mi.Count)) + if err != nil { + return fmt.Errorf("failed to parse count statement for exponential histogram: %w", err) + } + } + h.Value, err = parser.ParseStatement(customottl.ConvertToStatement(mi.Value)) + if err != nil { + return fmt.Errorf("failed to parse value statement for exponential histogram: %w", err) + } + return nil +} + +type Sum[K any] struct { + Value *ottl.Statement[K] +} + +func (s *Sum[K]) fromConfig( + mi *config.Sum, + parser ottl.Parser[K], +) error { + if mi == nil { + return nil + } + + var err error + s.Value, err = parser.ParseStatement(customottl.ConvertToStatement(mi.Value)) + if err != nil { + return fmt.Errorf("failed to parse value statement for sum: %w", err) + } + return nil +} + +type MetricDef[K any] struct { + Key MetricKey + Unit string + IncludeResourceAttributes []AttributeKeyValue + Attributes []AttributeKeyValue + Conditions *ottl.ConditionSequence[K] + ExponentialHistogram *ExponentialHistogram[K] + ExplicitHistogram *ExplicitHistogram[K] + Sum *Sum[K] +} + +func (md *MetricDef[K]) FromMetricInfo( + mi config.MetricInfo, + parser ottl.Parser[K], + telemetrySettings component.TelemetrySettings, +) error { + md.Key.Name = mi.Name + md.Key.Description = mi.Description + md.Unit = mi.Unit + + var err error + md.IncludeResourceAttributes, err = parseAttributeConfigs(mi.IncludeResourceAttributes) + if err != nil { + return fmt.Errorf("failed to parse include resource attribute config: %w", err) + } + md.Attributes, err = parseAttributeConfigs(mi.Attributes) + if err != nil { + return fmt.Errorf("failed to parse attribute config: %w", err) + } + if len(mi.Conditions) > 0 { + conditions, err := parser.ParseConditions(mi.Conditions) + if err != nil { + return fmt.Errorf("failed to parse OTTL conditions: %w", err) + } + condSeq := ottl.NewConditionSequence( + conditions, + telemetrySettings, + ottl.WithLogicOperation[K](ottl.Or), + ) + md.Conditions = &condSeq + } + if mi.Histogram != nil { + md.ExplicitHistogram = new(ExplicitHistogram[K]) + if err := md.ExplicitHistogram.fromConfig(mi.Histogram, parser); err != nil { + return fmt.Errorf("failed to parse histogram config: %w", err) + } + } + if mi.ExponentialHistogram != nil { + md.ExponentialHistogram = new(ExponentialHistogram[K]) + if err := md.ExponentialHistogram.fromConfig(mi.ExponentialHistogram, parser); err != nil { + return fmt.Errorf("failed to parse histogram config: %w", err) + } + } + if mi.Sum != nil { + md.Sum = new(Sum[K]) + if err := md.Sum.fromConfig(mi.Sum, parser); err != nil { + return fmt.Errorf("failed to parse sum config: %w", err) + } + } + return nil +} + +// FilterResourceAttributes filters resource attributes based on the +// `IncludeResourceAttributes` list for the metric definition. Resource +// attributes are only filtered if the list is specified, otherwise all the +// resource attributes are used for creating the metrics from the metric +// definition. +func (md *MetricDef[K]) FilterResourceAttributes( + attrs pcommon.Map, +) pcommon.Map { + var filteredAttributes pcommon.Map + switch { + case len(md.IncludeResourceAttributes) == 0: + filteredAttributes = pcommon.NewMap() + filteredAttributes.EnsureCapacity(attrs.Len()) + attrs.CopyTo(filteredAttributes) + default: + expectedLen := len(md.IncludeResourceAttributes) + filteredAttributes = filterAttributes(attrs, md.IncludeResourceAttributes, expectedLen) + } + return filteredAttributes +} + +// FilterAttributes filters event attributes (datapoint, logrecord, spans) +// based on the `Attributes` selected for the metric definition. If no +// attributes are selected then an empty `pcommon.Map` is returned. Note +// that, this filtering differs from resource attribute filtering as +// in attribute filtering if any of the configured attributes is not present +// in the data being processed then that metric definition is not processed. +// The method returns a bool signaling if the filter was successful and metric +// should be processed. If the bool value is false then the returned map +// should not be used. +func (md *MetricDef[K]) FilterAttributes(attrs pcommon.Map) (pcommon.Map, bool) { + // Figure out if all the attributes are available, saves allocation + for _, filter := range md.Attributes { + if filter.DefaultValue.Type() != pcommon.ValueTypeEmpty { + // will always add an attribute + continue + } + if _, ok := attrs.Get(filter.Key); !ok { + return pcommon.Map{}, false + } + } + return filterAttributes(attrs, md.Attributes, len(md.Attributes)), true +} + +func filterAttributes(attrs pcommon.Map, filters []AttributeKeyValue, expectedLen int) pcommon.Map { + filteredAttrs := pcommon.NewMap() + filteredAttrs.EnsureCapacity(expectedLen) + for _, filter := range filters { + if attr, ok := attrs.Get(filter.Key); ok { + attr.CopyTo(filteredAttrs.PutEmpty(filter.Key)) + continue + } + if filter.DefaultValue.Type() != pcommon.ValueTypeEmpty { + filter.DefaultValue.CopyTo(filteredAttrs.PutEmpty(filter.Key)) + } + } + return filteredAttrs +} + +func parseAttributeConfigs(cfgs []config.Attribute) ([]AttributeKeyValue, error) { + var errs []error + kvs := make([]AttributeKeyValue, len(cfgs)) + for i, attr := range cfgs { + val := pcommon.NewValueEmpty() + if err := val.FromRaw(attr.DefaultValue); err != nil { + errs = append(errs, err) + } + kvs[i] = AttributeKeyValue{ + Key: attr.Key, + DefaultValue: val, + } + } + + if len(errs) > 0 { + return nil, errors.Join(errs...) + } + return kvs, nil +} diff --git a/connector/signaltometricsconnector/testdata/logs/exponential_histograms/config.yaml b/connector/signaltometricsconnector/testdata/logs/exponential_histograms/config.yaml new file mode 100644 index 000000000000..1dea4b6182a5 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/exponential_histograms/config.yaml @@ -0,0 +1,40 @@ +signaltometrics: + logs: + - name: total.logrecords.exphistogram + description: Logrecords as exponential histogram with log.duration from attributes + exponential_histogram: + count: "1" + value: attributes["log.duration"] + - name: total.logrecords.resource.foo.exphistogram + description: Logrecords with resource attribute foo as exponential histogram with log.duration from attributes + include_resource_attributes: + - key: resource.foo + exponential_histogram: + count: "1" + value: attributes["log.duration"] + - name: log.foo.exphistogram + description: Count total number of log records as per log.foo attribute as exponential histogram with log.duration from attributes + attributes: + - key: log.foo + exponential_histogram: + count: "1" + value: attributes["log.duration"] + - name: log.bar.exphistogram + description: Count total number of log records as per log.bar attribute as exponential histogram with log.duration from attributes + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: log.bar + exponential_histogram: + count: "1" + value: attributes["log.duration"] + - name: ignored.exphistogram + description: Will be ignored due to conditions evaluating to false + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: log.bar + exponential_histogram: + count: "2" + value: attributes["log.duration"] diff --git a/connector/signaltometricsconnector/testdata/logs/exponential_histograms/output.yaml b/connector/signaltometricsconnector/testdata/logs/exponential_histograms/output.yaml new file mode 100644 index 000000000000..c0e6e450b560 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/exponential_histograms/output.yaml @@ -0,0 +1,473 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Logrecords as exponential histogram with log.duration from attributes + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - count: "4" + max: 101.5 + min: 7 + negative: {} + positive: + bucketCountsoffset: 89 + scale: 5 + sum: 128 + timeUnixNano: "1000000" + name: total.logrecords.exphistogram + - description: Count total number of log records as per log.foo attribute as exponential histogram with log.duration from attributes + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: log.foo + value: + stringValue: foo + count: "2" + max: 101.5 + min: 11.4 + negative: {} + positive: + bucketCountsoffset: 112 + scale: 5 + sum: 112.9 + timeUnixNano: "1000000" + - attributes: + - key: log.foo + value: + stringValue: notfoo + count: "1" + max: 8.1 + min: 8.1 + negative: {} + positive: + bucketCounts: + - "1" + offset: 3.16452e+06 + scale: 20 + sum: 8.1 + timeUnixNano: "1000000" + name: log.foo.exphistogram + - description: Count total number of log records as per log.bar attribute as exponential histogram with log.duration from attributes + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: log.bar + value: + stringValue: bar + count: "1" + max: 101.5 + min: 101.5 + negative: {} + positive: + bucketCounts: + - "1" + offset: 6.989111e+06 + scale: 20 + sum: 101.5 + timeUnixNano: "1000000" + - attributes: + - key: log.bar + value: + stringValue: notbar + count: "1" + max: 11.4 + min: 11.4 + negative: {} + positive: + bucketCounts: + - "1" + offset: 3.68151e+06 + scale: 20 + sum: 11.4 + timeUnixNano: "1000000" + name: log.bar.exphistogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Logrecords with resource attribute foo as exponential histogram with log.duration from attributes + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - count: "4" + max: 101.5 + min: 7 + negative: {} + positive: + bucketCountsoffset: 89 + scale: 5 + sum: 128 + timeUnixNano: "1000000" + name: total.logrecords.resource.foo.exphistogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/logs/histograms/config.yaml b/connector/signaltometricsconnector/testdata/logs/histograms/config.yaml new file mode 100644 index 000000000000..60bb229d9bff --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/histograms/config.yaml @@ -0,0 +1,45 @@ +signaltometrics: + logs: + - name: total.logrecords.histogram + description: Logrecords as histogram with log.duration from attributes + histogram: + count: "1" + value: attributes["log.duration"] + buckets: [1, 10, 50, 100, 200] + - name: total.logrecords.resource.foo.histogram + description: Logrecords with resource attribute foo as histogram with log.duration from attributes + include_resource_attributes: + - key: resource.foo + histogram: + count: "1" + value: attributes["log.duration"] + buckets: [1, 10, 50, 100, 200] + - name: log.foo.histogram + description: Count total number of log records as per log.foo attribute as histogram with log.duration from attributes + attributes: + - key: log.foo + histogram: + count: "1" + value: attributes["log.duration"] + buckets: [1, 10, 50, 100, 200] + - name: log.bar.histogram + description: Count total number of log records as per log.bar attribute as histogram with log.duration from attributes + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: log.bar + histogram: + count: "1" + value: attributes["log.duration"] + buckets: [1, 10, 50, 100, 200] + - name: ignored.histogram + description: Will be ignored due to conditions evaluating to false + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: log.bar + histogram: + count: "2" + value: attributes["log.duration"] + buckets: [1, 50, 200] diff --git a/connector/signaltometricsconnector/testdata/logs/histograms/output.yaml b/connector/signaltometricsconnector/testdata/logs/histograms/output.yaml new file mode 100644 index 000000000000..45d0973f3eed --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/histograms/output.yaml @@ -0,0 +1,154 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Logrecords as histogram with log.duration from attributes + histogram: + aggregationTemporality: 1 + dataPoints: + - bucketCounts: + - "0" + - "2" + - "1" + - "0" + - "1" + - "0" + count: "4" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 128 + timeUnixNano: "1000000" + name: total.logrecords.histogram + - description: Count total number of log records as per log.foo attribute as histogram with log.duration from attributes + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: log.foo + value: + stringValue: foo + bucketCounts: + - "0" + - "0" + - "1" + - "0" + - "1" + - "0" + count: "2" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 112.9 + timeUnixNano: "1000000" + - attributes: + - key: log.foo + value: + stringValue: notfoo + bucketCounts: + - "0" + - "1" + - "0" + - "0" + - "0" + - "0" + count: "1" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 8.1 + timeUnixNano: "1000000" + name: log.foo.histogram + - description: Count total number of log records as per log.bar attribute as histogram with log.duration from attributes + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: log.bar + value: + stringValue: bar + bucketCounts: + - "0" + - "0" + - "0" + - "0" + - "1" + - "0" + count: "1" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 101.5 + timeUnixNano: "1000000" + - attributes: + - key: log.bar + value: + stringValue: notbar + bucketCounts: + - "0" + - "0" + - "1" + - "0" + - "0" + - "0" + count: "1" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 11.4 + timeUnixNano: "1000000" + name: log.bar.histogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Logrecords with resource attribute foo as histogram with log.duration from attributes + histogram: + aggregationTemporality: 1 + dataPoints: + - bucketCounts: + - "0" + - "2" + - "1" + - "0" + - "1" + - "0" + count: "4" + explicitBounds: + - 1 + - 10 + - 50 + - 100 + - 200 + sum: 128 + timeUnixNano: "1000000" + name: total.logrecords.resource.foo.histogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/logs/logs.yaml b/connector/signaltometricsconnector/testdata/logs/logs.yaml new file mode 100644 index 000000000000..2df08286fbee --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/logs.yaml @@ -0,0 +1,63 @@ +resourceLogs: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + - key: resource.bar + value: + stringValue: bar + scopeLogs: + - logRecords: + - attributes: + - key: log.foo + value: + stringValue: foo + - key: log.bar + value: + stringValue: bar + - key: log.duration + value: + doubleValue: 101.5 + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.foo + value: + stringValue: foo + - key: log.bar + value: + stringValue: notbar + - key: log.duration + value: + doubleValue: 11.4 + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.foo + value: + stringValue: notfoo + - key: log.duration + value: + doubleValue: 8.1 + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + - attributes: + - key: log.duration + value: + doubleValue: 7 + body: + stringValue: This is a log message + spanId: "" + timeUnixNano: "1581452773000000789" + traceId: "" + scope: {} diff --git a/connector/signaltometricsconnector/testdata/logs/sum/config.yaml b/connector/signaltometricsconnector/testdata/logs/sum/config.yaml new file mode 100644 index 000000000000..7b9e20403a14 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/sum/config.yaml @@ -0,0 +1,35 @@ +signaltometrics: + logs: + - name: total.logrecords.sum + description: Count total number of log records + sum: + value: "1" + - name: total.logrecords.resource.foo.sum + description: Count total number of log records with resource attribute foo + include_resource_attributes: + - key: resource.foo + sum: + value: "1" + - name: log.foo.sum + description: Count total number of log records as per log.foo attribute + attributes: + - key: log.foo + sum: + value: "1" + - name: log.bar.sum + description: Count total number of log records as per log.bar attribute + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: log.bar + sum: + value: "1" + - name: ignored.sum + description: Will be ignored due to conditions evaluating to false + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: log.bar + sum: + value: "2" diff --git a/connector/signaltometricsconnector/testdata/logs/sum/output.yaml b/connector/signaltometricsconnector/testdata/logs/sum/output.yaml new file mode 100644 index 000000000000..f575a5231e79 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/logs/sum/output.yaml @@ -0,0 +1,70 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Count total number of log records + name: total.logrecords.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "4" + timeUnixNano: "1000000" + - description: Count total number of log records as per log.foo attribute + name: log.foo.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "2" + attributes: + - key: log.foo + value: + stringValue: foo + timeUnixNano: "1000000" + - asInt: "1" + attributes: + - key: log.foo + value: + stringValue: notfoo + timeUnixNano: "1000000" + - description: Count total number of log records as per log.bar attribute + name: log.bar.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: log.bar + value: + stringValue: bar + timeUnixNano: "1000000" + - asInt: "1" + attributes: + - key: log.bar + value: + stringValue: notbar + timeUnixNano: "1000000" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Count total number of log records with resource attribute foo + name: total.logrecords.resource.foo.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "4" + timeUnixNano: "1000000" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/config.yaml b/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/config.yaml new file mode 100644 index 000000000000..1b2e868fd80b --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/config.yaml @@ -0,0 +1,13 @@ +signaltometrics: + datapoints: + - name: gauge.to.exphistogram + description: An exponential histogram created from gague values + include_resource_attributes: + - key: resource.foo + attributes: + - key: datapoint.foo + conditions: + - metric.type == 1 # select all gauges + exponential_histogram: + count: "1" # 1 count for each datapoint + value: Double(value_int) + value_double # handle both int and double diff --git a/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/output.yaml b/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/output.yaml new file mode 100644 index 000000000000..42b210383c32 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/exponential_histograms/output.yaml @@ -0,0 +1,288 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: An exponential histogram created from gague values + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: datapoint.foo + value: + stringValue: foo + count: "4" + max: 456 + min: 1.23 + negative: {} + positive: + bucketCountsoffset: 4 + scale: 4 + sum: 584.79 + timeUnixNano: "1000000" + - attributes: + - key: datapoint.foo + value: + stringValue: notfoo + count: "2" + max: 789 + min: 7.89 + negative: {} + positive: + bucketCountsoffset: 47 + scale: 4 + sum: 796.89 + timeUnixNano: "1000000" + name: gauge.to.exphistogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/metrics/histograms/config.yaml b/connector/signaltometricsconnector/testdata/metrics/histograms/config.yaml new file mode 100644 index 000000000000..1b1932008d0b --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/histograms/config.yaml @@ -0,0 +1,15 @@ +signaltometrics: + datapoints: + - name: gauge.to.histogram + description: A histogram created from gague values + include_resource_attributes: + - key: resource.foo + attributes: + - key: datapoint.foo + conditions: + - metric.type == 1 # select all gauges + histogram: + buckets: [1, 4, 5, 8, 200, 500, 1000] + count: "1" # 1 count for each datapoint + value: Double(value_int) + value_double # handle both int and double + diff --git a/connector/signaltometricsconnector/testdata/metrics/histograms/output.yaml b/connector/signaltometricsconnector/testdata/metrics/histograms/output.yaml new file mode 100644 index 000000000000..2cda97a02d15 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/histograms/output.yaml @@ -0,0 +1,63 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: A histogram created from gague values + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: datapoint.foo + value: + stringValue: foo + bucketCounts: + - "0" + - "1" + - "1" + - "0" + - "1" + - "1" + - "0" + - "0" + count: "4" + explicitBounds: + - 1 + - 4 + - 5 + - 8 + - 200 + - 500 + - 1000 + sum: 584.79 + timeUnixNano: "1000000" + - attributes: + - key: datapoint.foo + value: + stringValue: notfoo + bucketCounts: + - "0" + - "0" + - "0" + - "1" + - "0" + - "0" + - "1" + - "0" + count: "2" + explicitBounds: + - 1 + - 4 + - 5 + - 8 + - 200 + - 500 + - 1000 + sum: 796.89 + timeUnixNano: "1000000" + name: gauge.to.histogram + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/metrics/metrics.yaml b/connector/signaltometricsconnector/testdata/metrics/metrics.yaml new file mode 100644 index 000000000000..d51bda6edf7f --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/metrics.yaml @@ -0,0 +1,226 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + - key: resource.bar + value: + stringValue: bar + scopeMetrics: + - metrics: + - gauge: + dataPoints: + - asInt: "123" + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "456" + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "789" + attributes: + - key: datapoint.foo + value: + stringValue: notfoo + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "0" + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + name: gauge-int + unit: "1" + - gauge: + dataPoints: + - asDouble: 1.23 + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 4.56 + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 7.89 + attributes: + - key: datapoint.foo + value: + stringValue: notfoo + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 0 + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + name: gauge-double + unit: "1" + - name: counter-int + sum: + aggregationTemporality: 2 + dataPoints: + - asInt: "123" + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "456" + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "789" + attributes: + - key: datapoint.foo + value: + stringValue: notfoo + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asInt: "0" + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + isMonotonic: true + unit: "1" + - name: counter-double + sum: + aggregationTemporality: 2 + dataPoints: + - asDouble: 1.23 + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 4.56 + attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 4.56 + attributes: + - key: datapoint.foo + value: + stringValue: notfoo + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + - asDouble: 0 + startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + isMonotonic: true + unit: "1" + - histogram: + aggregationTemporality: 2 + dataPoints: + - attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + count: "1" + startTimeUnixNano: "1581452772000000321" + sum: 15 + timeUnixNano: "1581452773000000789" + - attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + count: "2" + startTimeUnixNano: "1581452772000000321" + sum: 30 + timeUnixNano: "1581452773000000789" + - attributes: + - key: datapoint.foo + value: + stringValue: notfoo + count: "3" + startTimeUnixNano: "1581452772000000321" + sum: 45 + timeUnixNano: "1581452773000000789" + - startTimeUnixNano: "1581452772000000321" + sum: 0 + timeUnixNano: "1581452773000000789" + name: double-histogram + unit: "1" + - name: double-summary + summary: + dataPoints: + - attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: bar + count: "1" + startTimeUnixNano: "1581452772000000321" + sum: 15 + timeUnixNano: "1581452773000000789" + - attributes: + - key: datapoint.foo + value: + stringValue: foo + - key: datapoint.bar + value: + stringValue: notbar + count: "2" + startTimeUnixNano: "1581452772000000321" + sum: 30 + timeUnixNano: "1581452773000000789" + - attributes: + - key: datapoint.foo + value: + stringValue: notfoo + count: "3" + startTimeUnixNano: "1581452772000000321" + sum: 45 + timeUnixNano: "1581452773000000789" + - startTimeUnixNano: "1581452772000000321" + timeUnixNano: "1581452773000000789" + unit: "1" + scope: {} diff --git a/connector/signaltometricsconnector/testdata/metrics/sum/config.yaml b/connector/signaltometricsconnector/testdata/metrics/sum/config.yaml new file mode 100644 index 000000000000..0cc9cfb1abf9 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/sum/config.yaml @@ -0,0 +1,29 @@ +signaltometrics: + datapoints: + - name: total.datapoint.sum + description: Count total number of datapoints + sum: + value: "1" + - name: datapoint.foo.sum + description: Count total number of datapoints as per datapoint.foo attribute + attributes: + - key: datapoint.foo + sum: + value: "1" + - name: datapoint.bar.sum + description: Count total number of datapoints as per datapoint.bar attribute + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: datapoint.bar + sum: + value: "1" + - name: ignored.sum + description: Will be ignored due to conditions evaluating to false + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: datapoint.bar + sum: + value: "2" diff --git a/connector/signaltometricsconnector/testdata/metrics/sum/output.yaml b/connector/signaltometricsconnector/testdata/metrics/sum/output.yaml new file mode 100644 index 000000000000..ba8e962693e5 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/metrics/sum/output.yaml @@ -0,0 +1,54 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Count total number of datapoints + name: total.datapoint.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "24" + timeUnixNano: "1000000" + - description: Count total number of datapoints as per datapoint.foo attribute + name: datapoint.foo.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "12" + attributes: + - key: datapoint.foo + value: + stringValue: foo + timeUnixNano: "1000000" + - asInt: "6" + attributes: + - key: datapoint.foo + value: + stringValue: notfoo + timeUnixNano: "1000000" + - description: Count total number of datapoints as per datapoint.bar attribute + name: datapoint.bar.sum + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "6" + attributes: + - key: datapoint.bar + value: + stringValue: bar + timeUnixNano: "1000000" + - asInt: "6" + attributes: + - key: datapoint.bar + value: + stringValue: notbar + timeUnixNano: "1000000" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/traces/exponential_histograms/config.yaml b/connector/signaltometricsconnector/testdata/traces/exponential_histograms/config.yaml new file mode 100644 index 000000000000..641797fe0c62 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/exponential_histograms/config.yaml @@ -0,0 +1,54 @@ +signaltometrics: + spans: + - name: with_resource_foo_only + description: Spans with resource attribute including resource.foo as a exponential histogram metric + unit: ms + include_resource_attributes: + - key: resource.foo + exponential_histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: with_custom_count + description: Spans with custom count OTTL expression as a exponential histogram metric + unit: ms + exponential_histogram: + count: "2" # count each span twice + value: Milliseconds(end_time - start_time) + - name: http.trace.span.duration + description: Span duration for HTTP spans as a exponential histogram metric + unit: ms + attributes: + - key: http.response.status_code + exponential_histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: db.trace.span.duration + description: Span duration for DB spans as a exponential histogram metric + unit: ms + attributes: + - key: db.system + exponential_histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: msg.trace.span.duration + description: Span duration for messaging spans as a exponential histogram metric + unit: ms + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: messaging.system + exponential_histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: ignored.exphistogram + description: Will be ignored due to conditions evaluating to false + unit: ms + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: messaging.system + exponential_histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + diff --git a/connector/signaltometricsconnector/testdata/traces/exponential_histograms/output.yaml b/connector/signaltometricsconnector/testdata/traces/exponential_histograms/output.yaml new file mode 100644 index 000000000000..7904d70e5f7c --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/exponential_histograms/output.yaml @@ -0,0 +1,677 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Spans with resource attribute including resource.foo as a exponential histogram metric + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - count: "8" + max: 17000 + min: 2 + negative: {} + positive: + bucketCountsoffset: 7 + scale: 3 + sum: 31402 + timeUnixNano: "1000000" + name: with_resource_foo_only + unit: ms + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Spans with custom count OTTL expression as a exponential histogram metric + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - count: "14" + max: 17000 + min: 2 + negative: {} + positive: + bucketCountsoffset: 7 + scale: 3 + sum: 61804 + timeUnixNano: "1000000" + name: with_custom_count + unit: ms + - description: Span duration for HTTP spans as a exponential histogram metric + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: http.response.status_code + value: + intValue: "201" + count: "2" + max: 11000 + min: 900 + negative: {} + positive: + bucketCountsoffset: 314 + scale: 5 + sum: 11900 + timeUnixNano: "1000000" + name: http.trace.span.duration + unit: ms + - description: Span duration for DB spans as a exponential histogram metric + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: db.system + value: + stringValue: mysql + count: "4" + max: 1000 + min: 500 + negative: {} + positive: + bucketCountsoffset: 1147 + scale: 7 + sum: 2500 + timeUnixNano: "1000000" + name: db.trace.span.duration + unit: ms + - description: Span duration for messaging spans as a exponential histogram metric + exponentialHistogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: messaging.system + value: + stringValue: kafka + count: "2" + max: 17000 + min: 2 + negative: {} + positive: + bucketCountsoffset: 7 + scale: 3 + sum: 17002 + timeUnixNano: "1000000" + name: msg.trace.span.duration + unit: ms + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/traces/histograms/config.yaml b/connector/signaltometricsconnector/testdata/traces/histograms/config.yaml new file mode 100644 index 000000000000..ec7cdee7bbc6 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/histograms/config.yaml @@ -0,0 +1,53 @@ +signaltometrics: + spans: + - name: with_resource_foo_only + description: Spans with resource attribute including resource.foo as a histogram metric + unit: ms + include_resource_attributes: + - key: resource.foo + histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: with_custom_count + description: Spans with custom count OTTL expression as a histogram metric + unit: ms + histogram: + count: "2" # count each span twice + value: Milliseconds(end_time - start_time) + - name: http.trace.span.duration + description: Span duration for HTTP spans as a histogram metric + unit: ms + attributes: + - key: http.response.status_code + histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: db.trace.span.duration + description: Span duration for DB spans as a histogram metric + unit: ms + attributes: + - key: db.system + histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: msg.trace.span.duration + description: Span duration for messaging spans as a histogram metric + unit: ms + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: messaging.system + histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) + - name: ignored.histogram + description: Will be ignored due to conditions evaluating to false + unit: ms + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: messaging.system + histogram: + count: "Int(AdjustedCount())" + value: Milliseconds(end_time - start_time) diff --git a/connector/signaltometricsconnector/testdata/traces/histograms/output.yaml b/connector/signaltometricsconnector/testdata/traces/histograms/output.yaml new file mode 100644 index 000000000000..a25d144566c1 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/histograms/output.yaml @@ -0,0 +1,254 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Spans with resource attribute including resource.foo as a histogram metric + histogram: + aggregationTemporality: 1 + dataPoints: + - bucketCounts: + - "1" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "3" + - "2" + - "0" + - "0" + - "0" + - "0" + - "1" + - "1" + count: "8" + explicitBounds: + - 2 + - 4 + - 6 + - 8 + - 10 + - 50 + - 100 + - 200 + - 400 + - 800 + - 1000 + - 1400 + - 2000 + - 5000 + - 10000 + - 15000 + sum: 31402 + timeUnixNano: "1000000" + name: with_resource_foo_only + unit: ms + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Spans with custom count OTTL expression as a histogram metric + histogram: + aggregationTemporality: 1 + dataPoints: + - bucketCounts: + - "2" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "4" + - "4" + - "0" + - "0" + - "0" + - "0" + - "2" + - "2" + count: "14" + explicitBounds: + - 2 + - 4 + - 6 + - 8 + - 10 + - 50 + - 100 + - 200 + - 400 + - 800 + - 1000 + - 1400 + - 2000 + - 5000 + - 10000 + - 15000 + sum: 61804 + timeUnixNano: "1000000" + name: with_custom_count + unit: ms + - description: Span duration for HTTP spans as a histogram metric + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: http.response.status_code + value: + intValue: "201" + bucketCounts: + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "1" + - "0" + - "0" + - "0" + - "0" + - "1" + - "0" + count: "2" + explicitBounds: + - 2 + - 4 + - 6 + - 8 + - 10 + - 50 + - 100 + - 200 + - 400 + - 800 + - 1000 + - 1400 + - 2000 + - 5000 + - 10000 + - 15000 + sum: 11900 + timeUnixNano: "1000000" + name: http.trace.span.duration + unit: ms + - description: Span duration for DB spans as a histogram metric + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: db.system + value: + stringValue: mysql + bucketCounts: + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "3" + - "1" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + count: "4" + explicitBounds: + - 2 + - 4 + - 6 + - 8 + - 10 + - 50 + - 100 + - 200 + - 400 + - 800 + - 1000 + - 1400 + - 2000 + - 5000 + - 10000 + - 15000 + sum: 2500 + timeUnixNano: "1000000" + name: db.trace.span.duration + unit: ms + - description: Span duration for messaging spans as a histogram metric + histogram: + aggregationTemporality: 1 + dataPoints: + - attributes: + - key: messaging.system + value: + stringValue: kafka + bucketCounts: + - "1" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "0" + - "1" + count: "2" + explicitBounds: + - 2 + - 4 + - 6 + - 8 + - 10 + - 50 + - 100 + - 200 + - 400 + - 800 + - 1000 + - 1400 + - 2000 + - 5000 + - 10000 + - 15000 + sum: 17002 + timeUnixNano: "1000000" + name: msg.trace.span.duration + unit: ms + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector diff --git a/connector/signaltometricsconnector/testdata/traces/sum/config.yaml b/connector/signaltometricsconnector/testdata/traces/sum/config.yaml new file mode 100644 index 000000000000..fe1682e5baf6 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/sum/config.yaml @@ -0,0 +1,47 @@ +signaltometrics: + spans: + - name: with_resource_foo_only + description: Spans with resource attribute including resource.foo as a int sum metric + unit: s + include_resource_attributes: + - key: resource.foo + sum: + value: Int(Seconds(end_time - start_time)) + - name: span_adjusted_count + description: Adjusted count for the span as a sum metric + unit: s + sum: + value: Int(AdjustedCount()) + - name: http.trace.span.duration + description: Span duration for HTTP spans as a int sum metric + unit: s + attributes: + - key: http.response.status_code + sum: + value: Int(Seconds(end_time - start_time)) + - name: db.trace.span.duration + description: Span duration for DB spans as a int sum metric + unit: s + attributes: + - key: db.system + sum: + value: Int(Seconds(end_time - start_time)) + - name: msg.trace.span.duration + description: Span duration for messaging spans as a double sum metric + unit: s + conditions: # Will evaluate to true + - resource.attributes["404.attribute"] != nil + - resource.attributes["resource.foo"] != nil + attributes: + - key: messaging.system + sum: + value: Double(Seconds(end_time - start_time)) + - name: ignored.sum + description: Will be ignored due to conditions evaluating to false + unit: s + conditions: # Will evaluate to false + - resource.attributes["404.attribute"] != nil + attributes: + - key: messaging.system + sum: + value: Double(Seconds(end_time - start_time)) diff --git a/connector/signaltometricsconnector/testdata/traces/sum/output.yaml b/connector/signaltometricsconnector/testdata/traces/sum/output.yaml new file mode 100644 index 000000000000..5beb92e861f0 --- /dev/null +++ b/connector/signaltometricsconnector/testdata/traces/sum/output.yaml @@ -0,0 +1,74 @@ +resourceMetrics: + - resource: + attributes: + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Spans with resource attribute including resource.foo as a int sum metric + name: with_resource_foo_only + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "29" + timeUnixNano: "1000000" + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector + - resource: + attributes: + - key: resource.bar + value: + stringValue: bar + - key: resource.foo + value: + stringValue: foo + scopeMetrics: + - metrics: + - description: Adjusted count for the span as a sum metric + name: span_adjusted_count + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "8" + timeUnixNano: "1000000" + unit: s + - description: Span duration for HTTP spans as a int sum metric + name: http.trace.span.duration + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "11" + attributes: + - key: http.response.status_code + value: + intValue: "201" + timeUnixNano: "1000000" + unit: s + - description: Span duration for DB spans as a int sum metric + name: db.trace.span.duration + sum: + aggregationTemporality: 1 + dataPoints: + - asInt: "1" + attributes: + - key: db.system + value: + stringValue: mysql + timeUnixNano: "1000000" + unit: s + - description: Span duration for messaging spans as a double sum metric + name: msg.trace.span.duration + sum: + aggregationTemporality: 1 + dataPoints: + - asDouble: 17.002000935999998 + attributes: + - key: messaging.system + value: + stringValue: kafka + timeUnixNano: "1000000" + unit: s + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector From 6b818638c18fb5ff86430d3936a36d18da74bffc Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 16 Dec 2024 15:52:10 +0000 Subject: [PATCH 02/11] Fix lint --- connector/signaltometricsconnector/connector.go | 6 +++--- connector/signaltometricsconnector/connector_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/signaltometricsconnector/connector.go b/connector/signaltometricsconnector/connector.go index fa314ece6bd8..f826304d97e5 100644 --- a/connector/signaltometricsconnector/connector.go +++ b/connector/signaltometricsconnector/connector.go @@ -63,7 +63,7 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) continue } - // The transform context is created from orginal attributes so that the + // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottlspan.NewTransformContext(span, scopeSpan.Scope(), resourceSpan.Resource(), scopeSpan, resourceSpan) if md.Conditions != nil { @@ -108,7 +108,7 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics for _, md := range sm.dpMetricDefs { filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) aggregate := func(dp any, dpAttrs pcommon.Map) error { - // The transform context is created from orginal attributes so that the + // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottldatapoint.NewTransformContext(dp, metric, metrics, scopeMetric.Scope(), resourceMetric.Resource(), scopeMetric, resourceMetric) if md.Conditions != nil { @@ -211,7 +211,7 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro continue } - // The transform context is created from orginal attributes so that the + // The transform context is created from original attributes so that the // OTTL expressions are also applied on the original attributes. tCtx := ottllog.NewTransformContext(log, scopeLog.Scope(), resourceLog.Resource(), scopeLog, resourceLog) if md.Conditions != nil { diff --git a/connector/signaltometricsconnector/connector_test.go b/connector/signaltometricsconnector/connector_test.go index a7a7a3046c42..0464492f131f 100644 --- a/connector/signaltometricsconnector/connector_test.go +++ b/connector/signaltometricsconnector/connector_test.go @@ -312,9 +312,9 @@ func telemetryResource(t *testing.T) pcommon.Resource { return r } -func assertAggregatedMetrics(t *testing.T, expected, actual pmetric.Metrics) bool { +func assertAggregatedMetrics(t *testing.T, expected, actual pmetric.Metrics) { t.Helper() - return assert.NoError(t, pmetrictest.CompareMetrics( + assert.NoError(t, pmetrictest.CompareMetrics( expected, actual, pmetrictest.IgnoreMetricDataPointsOrder(), pmetrictest.IgnoreMetricsOrder(), From 6cd1af3a6dd63a3068fd93ec563564103ac25b4a Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Mon, 16 Dec 2024 17:08:41 +0000 Subject: [PATCH 03/11] go mod tidy --- connector/signaltometricsconnector/go.mod | 1 - 1 file changed, 1 deletion(-) diff --git a/connector/signaltometricsconnector/go.mod b/connector/signaltometricsconnector/go.mod index 9f784bb6f2ec..98a87f165ee9 100644 --- a/connector/signaltometricsconnector/go.mod +++ b/connector/signaltometricsconnector/go.mod @@ -60,7 +60,6 @@ require ( go.opentelemetry.io/collector/internal/fanoutconsumer v0.115.1-0.20241216091623-8ac40a01a5ff // indirect go.opentelemetry.io/collector/pdata/pprofile v0.115.1-0.20241216091623-8ac40a01a5ff // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.0.0-20241215143820-6147243aaaa1 // indirect - go.opentelemetry.io/collector/semconv v0.115.1-0.20241216091623-8ac40a01a5ff // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect From 3d0b035bd720bde36b981ebb4bda28b0bc85c40a Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 17 Dec 2024 16:07:18 +0000 Subject: [PATCH 04/11] Add changelog --- .chloggen/signaltometrics-corelogic.yaml | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/signaltometrics-corelogic.yaml diff --git a/.chloggen/signaltometrics-corelogic.yaml b/.chloggen/signaltometrics-corelogic.yaml new file mode 100644 index 000000000000..6c82c01d5ed0 --- /dev/null +++ b/.chloggen/signaltometrics-corelogic.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: signaltometricsconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add core logic for the signal to metrics connector to make it functional. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35930] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 403baf31efea964606dffe59a0c1ab16d6bed97d Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 18 Dec 2024 09:52:03 +0000 Subject: [PATCH 05/11] make checks --- connector/signaltometricsconnector/go.mod | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connector/signaltometricsconnector/go.mod b/connector/signaltometricsconnector/go.mod index 4cda1a57bdeb..6f9792ee512e 100644 --- a/connector/signaltometricsconnector/go.mod +++ b/connector/signaltometricsconnector/go.mod @@ -4,7 +4,10 @@ go 1.22.0 require ( github.com/lightstep/go-expohisto v1.0.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.116.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.116.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.116.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.116.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.116.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.116.0 @@ -16,6 +19,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.116.0 go.opentelemetry.io/collector/pdata v1.22.0 go.opentelemetry.io/collector/pipeline v0.116.0 + go.opentelemetry.io/collector/semconv v0.116.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -56,7 +60,6 @@ require ( go.opentelemetry.io/collector/internal/fanoutconsumer v0.116.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.116.0 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.116.0 // indirect - go.opentelemetry.io/collector/semconv v0.116.0 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect From 757f32e85cea35f50a8c531def242bb01de392d8 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 2 Jan 2025 14:39:14 +0000 Subject: [PATCH 06/11] make checks --- connector/signaltometricsconnector/go.mod | 1 - connector/signaltometricsconnector/go.sum | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/connector/signaltometricsconnector/go.mod b/connector/signaltometricsconnector/go.mod index 222fbd38cf8d..550fe862ebf6 100644 --- a/connector/signaltometricsconnector/go.mod +++ b/connector/signaltometricsconnector/go.mod @@ -60,7 +60,6 @@ require ( go.opentelemetry.io/collector/internal/fanoutconsumer v0.116.1-0.20241220212031-7c2639723f67 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.116.1-0.20241220212031-7c2639723f67 // indirect go.opentelemetry.io/collector/pipeline/xpipeline v0.116.1-0.20241220212031-7c2639723f67 // indirect - go.opentelemetry.io/collector/semconv v0.116.1-0.20241220212031-7c2639723f67 // indirect go.opentelemetry.io/otel v1.32.0 // indirect go.opentelemetry.io/otel/metric v1.32.0 // indirect go.opentelemetry.io/otel/sdk v1.32.0 // indirect diff --git a/connector/signaltometricsconnector/go.sum b/connector/signaltometricsconnector/go.sum index 2812e9fe307d..da4ec0f73f39 100644 --- a/connector/signaltometricsconnector/go.sum +++ b/connector/signaltometricsconnector/go.sum @@ -8,6 +8,8 @@ github.com/antchfx/xmlquery v1.4.3 h1:f6jhxCzANrWfa93O+NmRWvieVyLs+R2Szfpy+YrZaw github.com/antchfx/xmlquery v1.4.3/go.mod h1:AEPEEPYE9GnA2mj5Ur2L5Q5/2PycJ0N9Fusrx9b12fc= github.com/antchfx/xpath v1.3.3 h1:tmuPQa1Uye0Ym1Zn65vxPgfltWb/Lxu2jeqIGteJSRs= github.com/antchfx/xpath v1.3.3/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= From a6b0e8e91ebd445832bdadb55c0dca6dd553393d Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 7 Jan 2025 10:24:34 +0000 Subject: [PATCH 07/11] Update connector/signaltometricsconnector/factory.go Co-authored-by: Christos Markou --- connector/signaltometricsconnector/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/signaltometricsconnector/factory.go b/connector/signaltometricsconnector/factory.go index a31a3e7b50da..1e6ede1079ee 100644 --- a/connector/signaltometricsconnector/factory.go +++ b/connector/signaltometricsconnector/factory.go @@ -44,7 +44,7 @@ func createTracesToMetrics( c := cfg.(*config.Config) parser, err := ottlspan.NewParser(customottl.SpanFuncs(), set.TelemetrySettings) if err != nil { - return nil, fmt.Errorf("failed to create OTTL statement parser for datapoints: %w", err) + return nil, fmt.Errorf("failed to create OTTL statement parser for spans: %w", err) } metricDefs := make([]model.MetricDef[ottlspan.TransformContext], 0, len(c.Spans)) From 155002fa722abbac241a8727ed34b478b81896c8 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 7 Jan 2025 11:33:42 +0000 Subject: [PATCH 08/11] Update connector/signaltometricsconnector/factory.go Co-authored-by: Christos Markou --- connector/signaltometricsconnector/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/signaltometricsconnector/factory.go b/connector/signaltometricsconnector/factory.go index 1e6ede1079ee..673722dd6ce0 100644 --- a/connector/signaltometricsconnector/factory.go +++ b/connector/signaltometricsconnector/factory.go @@ -100,7 +100,7 @@ func createLogsToMetrics( c := cfg.(*config.Config) parser, err := ottllog.NewParser(customottl.LogFuncs(), set.TelemetrySettings) if err != nil { - return nil, fmt.Errorf("failed to create OTTL statement parser for datapoints: %w", err) + return nil, fmt.Errorf("failed to create OTTL statement parser for logs: %w", err) } metricDefs := make([]model.MetricDef[ottllog.TransformContext], 0, len(c.Logs)) From d768c5d49cc40d8bcaf12725c6b1778e1d267c76 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Tue, 7 Jan 2025 11:39:23 +0000 Subject: [PATCH 09/11] Clarify partial consumption --- connector/signaltometricsconnector/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connector/signaltometricsconnector/README.md b/connector/signaltometricsconnector/README.md index be38b041b716..d769bb9ff7e2 100644 --- a/connector/signaltometricsconnector/README.md +++ b/connector/signaltometricsconnector/README.md @@ -1,7 +1,9 @@ # Signal to metrics connector Signal to metrics connector produces metrics from all signal types (traces, -logs, or metrics). +logs, or metrics). Partial consumption of signals are allowed i.e. if the component +successfully consumes part of the signal, producing a partial metric, then the +produced metric is forwarded to the next component with a warning log. | Status | | From 5d4c73cc410b538586638446309ebfeb2c48828c Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 8 Jan 2025 11:01:53 +0000 Subject: [PATCH 10/11] remove partial success --- .../signaltometricsconnector/connector.go | 70 ++++++++----------- 1 file changed, 28 insertions(+), 42 deletions(-) diff --git a/connector/signaltometricsconnector/connector.go b/connector/signaltometricsconnector/connector.go index f826304d97e5..b1a9234fe2f5 100644 --- a/connector/signaltometricsconnector/connector.go +++ b/connector/signaltometricsconnector/connector.go @@ -5,7 +5,6 @@ package signaltometricsconnector // import "github.com/open-telemetry/openteleme import ( "context" - "errors" "fmt" "go.opentelemetry.io/collector/component" @@ -44,7 +43,6 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) return nil } - var multiError error processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(td.ResourceSpans().Len()) aggregator := aggregator.NewAggregator[ottlspan.TransformContext](processedMetrics) @@ -69,8 +67,7 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { - multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) - continue + return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) @@ -79,13 +76,15 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) } filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) - multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1)) + if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, 1); err != nil { + return err + } } } } } aggregator.Finalize(sm.spanMetricDefs) - return sm.processNext(ctx, processedMetrics, multiError) + return sm.next.ConsumeMetrics(ctx, processedMetrics) } func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics) error { @@ -93,7 +92,6 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics return nil } - var multiError error processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(m.ResourceMetrics().Len()) aggregator := aggregator.NewAggregator[ottldatapoint.TransformContext](processedMetrics) @@ -114,8 +112,7 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { - multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) - return nil + return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) @@ -135,7 +132,9 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if !ok { continue } - multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + if err := aggregate(dp, filteredDPAttrs); err != nil { + return err + } } case pmetric.MetricTypeSum: dps := metric.Sum().DataPoints() @@ -145,7 +144,9 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if !ok { continue } - multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + if err := aggregate(dp, filteredDPAttrs); err != nil { + return err + } } case pmetric.MetricTypeSummary: dps := metric.Summary().DataPoints() @@ -155,7 +156,9 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if !ok { continue } - multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + if err := aggregate(dp, filteredDPAttrs); err != nil { + return err + } } case pmetric.MetricTypeHistogram: dps := metric.Histogram().DataPoints() @@ -165,7 +168,9 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if !ok { continue } - multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + if err := aggregate(dp, filteredDPAttrs); err != nil { + return err + } } case pmetric.MetricTypeExponentialHistogram: dps := metric.ExponentialHistogram().DataPoints() @@ -175,17 +180,19 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics if !ok { continue } - multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs)) + if err := aggregate(dp, filteredDPAttrs); err != nil { + return err + } } case pmetric.MetricTypeEmpty: - multiError = errors.Join(multiError, fmt.Errorf("metric %q: invalid metric type: %v", metric.Name(), metric.Type())) + continue } } } } } aggregator.Finalize(sm.dpMetricDefs) - return sm.processNext(ctx, processedMetrics, multiError) + return sm.next.ConsumeMetrics(ctx, processedMetrics) } func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) error { @@ -193,7 +200,6 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro return nil } - var multiError error processedMetrics := pmetric.NewMetrics() processedMetrics.ResourceMetrics().EnsureCapacity(logs.ResourceLogs().Len()) aggregator := aggregator.NewAggregator[ottllog.TransformContext](processedMetrics) @@ -217,41 +223,21 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro if md.Conditions != nil { match, err := md.Conditions.Eval(ctx, tCtx) if err != nil { - multiError = errors.Join(multiError, fmt.Errorf("failed to evaluate conditions, skipping: %w", err)) - continue + return fmt.Errorf("failed to evaluate conditions: %w", err) } if !match { sm.logger.Debug("condition not matched, skipping", zap.String("name", md.Key.Name)) continue } } - filteredResAttrs := md.FilterResourceAttributes(resourceAttrs) - multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1)) + if err := aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1); err != nil { + return err + } } } } } aggregator.Finalize(sm.logMetricDefs) - return sm.processNext(ctx, processedMetrics, multiError) -} - -// processNext is a helper method for all the Consume* methods to do error handling, -// logging, and sending the processed metrics to the next consumer in the pipeline. -func (sm *signalToMetrics) processNext(ctx context.Context, m pmetric.Metrics, err error) error { - if err != nil { - dpCount := m.DataPointCount() - if dpCount == 0 { - // No signals were consumed so return an error - return fmt.Errorf("failed to consume signal: %w", err) - } - // At least some signals were partially consumed, so log the error - // and pass the processed metrics to the next consumer. - sm.logger.Warn( - "failed to consume all signals, some signals were partially processed", - zap.Error(err), - zap.Int("successful_data_points", dpCount), - ) - } - return sm.next.ConsumeMetrics(ctx, m) + return sm.next.ConsumeMetrics(ctx, processedMetrics) } From 07f1cf9efcd202798bd6849d50c1a42889b37186 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 8 Jan 2025 11:02:55 +0000 Subject: [PATCH 11/11] Remove deprecated readme --- connector/signaltometricsconnector/README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/connector/signaltometricsconnector/README.md b/connector/signaltometricsconnector/README.md index d769bb9ff7e2..be38b041b716 100644 --- a/connector/signaltometricsconnector/README.md +++ b/connector/signaltometricsconnector/README.md @@ -1,9 +1,7 @@ # Signal to metrics connector Signal to metrics connector produces metrics from all signal types (traces, -logs, or metrics). Partial consumption of signals are allowed i.e. if the component -successfully consumes part of the signal, producing a partial metric, then the -produced metric is forwarded to the next component with a warning log. +logs, or metrics). | Status | |