Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] Fix usage of setupTestTelemetry in filter processor #36393

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/collector/consumer v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/consumer/consumertest v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/pdata v1.19.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/processor v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/collector/processor/processortest v0.113.1-0.20241115165626-8b99b8023ca3
go.opentelemetry.io/otel v1.32.0
Expand Down Expand Up @@ -64,7 +65,6 @@ require (
go.opentelemetry.io/collector/featuregate v1.19.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/pipeline v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/processor/processorprofiles v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/collector/semconv v0.113.1-0.20241115165626-8b99b8023ca3 // indirect
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions processor/filterprocessor/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/multierr"
Expand All @@ -22,7 +23,7 @@ import (

type filterLogProcessor struct {
skipExpr expr.BoolExpr[ottllog.TransformContext]
telemetry *filterProcessorTelemetry
telemetry *filterTelemetry
logger *zap.Logger
}

Expand All @@ -31,7 +32,7 @@ func newFilterLogsProcessor(set processor.Settings, cfg *Config) (*filterLogProc
logger: set.Logger,
}

fpt, err := newfilterProcessorTelemetry(set)
fpt, err := newFilterTelemetry(set, pipeline.SignalLogs)
if err != nil {
return nil, fmt.Errorf("error creating filter processor telemetry: %w", err)
}
Expand Down Expand Up @@ -92,7 +93,7 @@ func (flp *filterLogProcessor) processLogs(ctx context.Context, ld plog.Logs) (p
})

logCountAfterFilters := ld.LogRecordCount()
flp.telemetry.record(triggerLogsDropped, int64(logCountBeforeFilters-logCountAfterFilters))
flp.telemetry.record(ctx, int64(logCountBeforeFilters-logCountAfterFilters))

if errors != nil {
flp.logger.Error("failed processing logs", zap.Error(errors))
Expand Down
1 change: 1 addition & 0 deletions processor/filterprocessor/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ func TestFilterLogProcessorTelemetry(t *testing.T) {
}

tel.assertMetrics(t, want)
require.NoError(t, tel.Shutdown(context.Background()))
}

func constructLogs() plog.Logs {
Expand Down
7 changes: 4 additions & 3 deletions processor/filterprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/multierr"
Expand All @@ -29,7 +30,7 @@ type filterMetricProcessor struct {
skipResourceExpr expr.BoolExpr[ottlresource.TransformContext]
skipMetricExpr expr.BoolExpr[ottlmetric.TransformContext]
skipDataPointExpr expr.BoolExpr[ottldatapoint.TransformContext]
telemetry *filterProcessorTelemetry
telemetry *filterTelemetry
logger *zap.Logger
}

Expand All @@ -39,7 +40,7 @@ func newFilterMetricProcessor(set processor.Settings, cfg *Config) (*filterMetri
logger: set.Logger,
}

fpt, err := newfilterProcessorTelemetry(set)
fpt, err := newFilterTelemetry(set, pipeline.SignalMetrics)
if err != nil {
return nil, fmt.Errorf("error creating filter processor telemetry: %w", err)
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func (fmp *filterMetricProcessor) processMetrics(ctx context.Context, md pmetric
})

metricDataPointCountAfterFilters := md.DataPointCount()
fmp.telemetry.record(triggerMetricDataPointsDropped, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters))
fmp.telemetry.record(ctx, int64(metricDataPointCountBeforeFilters-metricDataPointCountAfterFilters))

if errors != nil {
fmp.logger.Error("failed processing metrics", zap.Error(errors))
Expand Down
167 changes: 4 additions & 163 deletions processor/filterprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,90 +369,21 @@ func TestFilterMetricProcessor(t *testing.T) {

func TestFilterMetricProcessorTelemetry(t *testing.T) {
tel := setupTestTelemetry()
next := new(consumertest.MetricsSink)
cfg := &Config{
Metrics: MetricFilters{
MetricConditions: []string{
"name==\"metric1\"",
},
},
}
factory := NewFactory()
fmp, err := factory.CreateMetrics(
context.Background(),
fmp, err := newFilterMetricProcessor(
tel.NewSettings(),
cfg,
next,
)
assert.NotNil(t, fmp)
assert.NoError(t, err)

caps := fmp.Capabilities()
assert.True(t, caps.MutatesData)
ctx := context.Background()
assert.NoError(t, fmp.Start(ctx, nil))

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"foo", "bar"},
resourceAttributes: map[string]any{
"attr1": "attr1/val1",
},
},
}))
assert.NoError(t, err)

want := []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 0,
Attributes: attribute.NewSet(attribute.String("filter", "filter")),
},
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}

tel.assertMetrics(t, want)

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
_, err = fmp.processMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"metric1", "metric2"},
resourceAttributes: map[string]any{
Expand All @@ -462,7 +393,7 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
}))
assert.NoError(t, err)

want = []metricdata.Metrics{
want := []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Expand All @@ -478,99 +409,9 @@ func TestFilterMetricProcessorTelemetry(t *testing.T) {
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}
tel.assertMetrics(t, want)

err = fmp.ConsumeMetrics(context.Background(), testResourceMetrics([]metricWithResource{
{
metricNames: []string{"metric1"},
resourceAttributes: map[string]any{
"attr1": "attr1/val1",
},
},
}))
assert.NoError(t, err)

want = []metricdata.Metrics{
{
Name: "otelcol_processor_filter_datapoints.filtered",
Description: "Number of metric data points dropped by the filter processor",
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("filter", "filter")),
},
},
},
},
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 5,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "filter"), attribute.String("otel.signal", "metrics")),
},
},
},
},
}
tel.assertMetrics(t, want)

assert.NoError(t, fmp.Shutdown(ctx))
require.NoError(t, tel.Shutdown(context.Background()))
}

func testResourceMetrics(mwrs []metricWithResource) pmetric.Metrics {
Expand Down
51 changes: 23 additions & 28 deletions processor/filterprocessor/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,45 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"fmt"

"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor/internal/metadata"
)

type trigger int

const (
triggerMetricDataPointsDropped trigger = iota
triggerLogsDropped
triggerSpansDropped
)

type filterProcessorTelemetry struct {
exportCtx context.Context

processorAttr []attribute.KeyValue

telemetryBuilder *metadata.TelemetryBuilder
type filterTelemetry struct {
attr metric.MeasurementOption
counter metric.Int64Counter
}

func newfilterProcessorTelemetry(set processor.Settings) (*filterProcessorTelemetry, error) {
func newFilterTelemetry(set processor.Settings, signal pipeline.Signal) (*filterTelemetry, error) {
telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
if err != nil {
return nil, err
}

return &filterProcessorTelemetry{
processorAttr: []attribute.KeyValue{attribute.String(metadata.Type.String(), set.ID.String())},
exportCtx: context.Background(),
telemetryBuilder: telemetryBuilder,
var counter metric.Int64Counter
switch signal {
case pipeline.SignalMetrics:
counter = telemetryBuilder.ProcessorFilterDatapointsFiltered
case pipeline.SignalLogs:
counter = telemetryBuilder.ProcessorFilterLogsFiltered
case pipeline.SignalTraces:
counter = telemetryBuilder.ProcessorFilterSpansFiltered
default:
return nil, fmt.Errorf("unsupported signal type: %v", signal)
}

return &filterTelemetry{
attr: metric.WithAttributeSet(attribute.NewSet(attribute.String(metadata.Type.String(), set.ID.String()))),
counter: counter,
}, nil
}

func (fpt *filterProcessorTelemetry) record(trigger trigger, dropped int64) {
switch trigger {
case triggerMetricDataPointsDropped:
fpt.telemetryBuilder.ProcessorFilterDatapointsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerLogsDropped:
fpt.telemetryBuilder.ProcessorFilterLogsFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
case triggerSpansDropped:
fpt.telemetryBuilder.ProcessorFilterSpansFiltered.Add(fpt.exportCtx, dropped, metric.WithAttributes(fpt.processorAttr...))
}
func (fpt *filterTelemetry) record(ctx context.Context, dropped int64) {
fpt.counter.Add(ctx, dropped, fpt.attr)
}
Loading