Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[resourceprocessor] add support for profile signal type #36208

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3ed5dcd
[resourceprocessor] add support for profile signal type
bacherfl Nov 5, 2024
b8a5079
update go deps
bacherfl Nov 5, 2024
97cb116
fix linting and other checks
bacherfl Nov 5, 2024
56a6450
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Nov 6, 2024
fc6be7a
fix failing unit test
bacherfl Nov 6, 2024
0504632
fix linting
bacherfl Nov 6, 2024
5353168
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Nov 7, 2024
ec30145
go mod tidy
bacherfl Nov 7, 2024
b15f297
fix profiles stability level
bacherfl Nov 11, 2024
b2fb150
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Nov 19, 2024
459ddc2
adapt to changes in main
bacherfl Nov 19, 2024
ea947e6
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Dec 6, 2024
ec0696d
fix merge conflicts
bacherfl Dec 6, 2024
4525294
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Dec 10, 2024
b21942f
fix merge conflicts
bacherfl Dec 10, 2024
64d0294
fix formatting
bacherfl Dec 10, 2024
770f8ef
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Dec 19, 2024
13ccb38
fix merge conflicts
bacherfl Dec 19, 2024
34485ac
fix linting
bacherfl Dec 19, 2024
89ef6f7
fix linting
bacherfl Dec 19, 2024
267dd71
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 7, 2025
f4307ea
fix merge conflicts
bacherfl Jan 7, 2025
d547fcc
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 17, 2025
e60f400
merge main
bacherfl Jan 17, 2025
1a33e67
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 20, 2025
f2519f7
fix merge conflicts
bacherfl Jan 20, 2025
a74f268
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 23, 2025
d76dea0
fix merge conflicts
bacherfl Jan 23, 2025
b7ebf56
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 23, 2025
5165013
fix dependencies
bacherfl Jan 23, 2025
b695f23
Merge branch 'main' into feat/35979/rp-profiles
bacherfl Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/resource-processor-add-profiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: resourceprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for profile signal type

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [359979]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 3 additions & 1 deletion processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, metrics, logs |
| Stability | [development]: profiles |
| | [beta]: traces, metrics, logs |
| Distributions | [core], [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fresource%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fresource) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fresource%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fresource) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@dmitryax](https://www.github.com/dmitryax) |

[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand Down
33 changes: 29 additions & 4 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor/internal/metadata"
Expand All @@ -19,12 +22,14 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true}

// NewFactory returns a new factory for the Resource processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
return xprocessor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(createTracesProcessor, metadata.TracesStability),
processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
processor.WithLogs(createLogsProcessor, metadata.LogsStability))
xprocessor.WithTraces(createTracesProcessor, metadata.TracesStability),
xprocessor.WithMetrics(createMetricsProcessor, metadata.MetricsStability),
xprocessor.WithLogs(createLogsProcessor, metadata.LogsStability),
xprocessor.WithProfiles(createProfilesProcessor, metadata.ProfilesStability),
)
}

// Note: This isn't a valid configuration because the processor would do no work.
Expand Down Expand Up @@ -91,3 +96,23 @@ func createLogsProcessor(
proc.processLogs,
processorhelper.WithCapabilities(processorCapabilities))
}

func createProfilesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer xconsumer.Profiles,
) (xprocessor.Profiles, error) {
attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: cfg.(*Config).AttributesActions})
if err != nil {
return nil, err
}
proc := resourceProcessor{logger: set.Logger, attrProc: attrProc}
return xprocessorhelper.NewProfiles(
ctx,
set,
cfg,
nextConsumer,
proc.processProfiles,
xprocessorhelper.WithCapabilities(processorCapabilities))
}
15 changes: 15 additions & 0 deletions processor/resourceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/processor/xprocessor"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
)
Expand All @@ -37,6 +38,14 @@ func TestCreateProcessor(t *testing.T) {
mp, err := factory.CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, mp)

lp, err := factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, lp)

pp, err := factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, pp)
}

func TestInvalidAttributeActions(t *testing.T) {
Expand All @@ -52,4 +61,10 @@ func TestInvalidAttributeActions(t *testing.T) {

_, err = factory.CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, nil)
assert.Error(t, err)

_, err = factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, nil)
assert.Error(t, err)

_, err = factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), cfg, nil)
assert.Error(t, err)
}
7 changes: 4 additions & 3 deletions processor/resourceprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ require (
go.opentelemetry.io/collector/confmap v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/consumer v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/processor v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3
go.opentelemetry.io/collector/processor/xprocessor v0.118.1-0.20250121185328-fbefb22cc2b3
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)
Expand All @@ -40,11 +44,8 @@ require (
go.opentelemetry.io/collector/client v1.24.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions processor/resourceprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions processor/resourceprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ type: resource
status:
class: processor
stability:
development: [profiles]
beta: [traces, metrics, logs]
distributions: [core, contrib, k8s]
codeowners:
Expand Down
9 changes: 9 additions & 0 deletions processor/resourceprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

Expand Down Expand Up @@ -42,3 +43,11 @@ func (rp *resourceProcessor) processLogs(ctx context.Context, ld plog.Logs) (plo
}
return ld, nil
}

func (rp *resourceProcessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) {
rps := pd.ResourceProfiles()
for i := 0; i < rps.Len(); i++ {
rp.attrProc.Process(ctx, rp.logger, rps.At(i).Resource().Attributes())
}
return pd, nil
}
43 changes: 43 additions & 0 deletions processor/resourceprocessor/resource_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor/processortest"
"go.opentelemetry.io/collector/processor/xprocessor"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
Expand Down Expand Up @@ -129,6 +132,20 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) {
logs := tln.AllLogs()
require.Len(t, logs, 1)
assert.NoError(t, plogtest.CompareLogs(wantLogData, logs[0]))

// Test profiles consumer
tpn := new(consumertest.ProfilesSink)
rpp, err := factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), tt.config, tpn)
require.NoError(t, err)
assert.True(t, rpp.Capabilities().MutatesData)

sourceProfileData := generateProfileData(tt.sourceAttributes)
wantProfileData := generateProfileData(tt.wantAttributes)
err = rpp.ConsumeProfiles(context.Background(), sourceProfileData)
require.NoError(t, err)
profiles := tpn.AllProfiles()
require.Len(t, profiles, 1)
compareProfileAttributes(t, wantProfileData, sourceProfileData)
})
}
}
Expand Down Expand Up @@ -168,3 +185,29 @@ func generateLogData(attributes map[string]string) plog.Logs {
}
return ld
}

func generateProfileData(attributes map[string]string) pprofile.Profiles {
p := pprofile.NewProfiles()
rp := p.ResourceProfiles().AppendEmpty()

for k, v := range attributes {
rp.Resource().Attributes().PutStr(k, v)
}
return p
}

func compareProfileAttributes(t *testing.T, expected pprofile.Profiles, got pprofile.Profiles) {
require.Equal(t, expected.ResourceProfiles().Len(), got.ResourceProfiles().Len())

for i := 0; i < expected.ResourceProfiles().Len(); i++ {
expectedResourceProfile := expected.ResourceProfiles().At(i)
gotResourceProfile := got.ResourceProfiles().At(i)

expectedResourceProfile.Resource().Attributes().Range(func(k string, v pcommon.Value) bool {
get, ok := gotResourceProfile.Resource().Attributes().Get(k)
require.True(t, ok)
require.Equal(t, v, get)
return true
})
}
}
Loading