From c9d37d68e93728220b6f300161e87d620b002aa4 Mon Sep 17 00:00:00 2001 From: odubajDT Date: Wed, 8 Jan 2025 09:54:53 +0100 Subject: [PATCH 1/2] [pkg/ottl] enhance flatten editor to resolve attribute key conflicts Signed-off-by: odubajDT --- .chloggen/flatten-conflict.yaml | 27 ++ pkg/ottl/e2e/e2e_test.go | 46 ++++ pkg/ottl/ottlfuncs/README.md | 44 +++- pkg/ottl/ottlfuncs/func_flatten.go | 61 +++-- pkg/ottl/ottlfuncs/func_flatten_test.go | 320 +++++++++++++++++++++++- 5 files changed, 477 insertions(+), 21 deletions(-) create mode 100644 .chloggen/flatten-conflict.yaml diff --git a/.chloggen/flatten-conflict.yaml b/.chloggen/flatten-conflict.yaml new file mode 100644 index 000000000000..8706963cc738 --- /dev/null +++ b/.chloggen/flatten-conflict.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/ottl + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Enhance flatten() editor to resolve attribute key conflicts by adding a number suffix to the conflicting keys." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35793] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/pkg/ottl/e2e/e2e_test.go b/pkg/ottl/e2e/e2e_test.go index a11d78a72305..8b09291ea104 100644 --- a/pkg/ottl/e2e/e2e_test.go +++ b/pkg/ottl/e2e/e2e_test.go @@ -59,16 +59,21 @@ func Test_e2e_editors(t *testing.T) { tCtx.GetLogRecord().Attributes().Remove("total.string") tCtx.GetLogRecord().Attributes().Remove("foo") tCtx.GetLogRecord().Attributes().Remove("things") + tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1") + tCtx.GetLogRecord().Attributes().Remove("conflict") }, }, { statement: `flatten(attributes)`, want: func(tCtx ottllog.TransformContext) { tCtx.GetLogRecord().Attributes().Remove("foo") + tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1") + tCtx.GetLogRecord().Attributes().Remove("conflict") tCtx.GetLogRecord().Attributes().PutStr("foo.bar", "pass") tCtx.GetLogRecord().Attributes().PutStr("foo.flags", "pass") tCtx.GetLogRecord().Attributes().PutStr("foo.slice.0", "val") tCtx.GetLogRecord().Attributes().PutStr("foo.nested.test", "pass") + tCtx.GetLogRecord().Attributes().PutStr("conflict.conflict1.conflict2", "nopass") tCtx.GetLogRecord().Attributes().Remove("things") tCtx.GetLogRecord().Attributes().PutStr("things.0.name", "foo") @@ -93,6 +98,7 @@ func Test_e2e_editors(t *testing.T) { m.PutStr("test.foo.flags", "pass") m.PutStr("test.foo.slice.0", "val") m.PutStr("test.foo.nested.test", "pass") + m.PutStr("test.conflict.conflict1.conflict2", "nopass") m.PutStr("test.things.0.name", "foo") m.PutInt("test.things.0.value", 2) @@ -101,6 +107,34 @@ func Test_e2e_editors(t *testing.T) { m.CopyTo(tCtx.GetLogRecord().Attributes()) }, }, + { + statement: `flatten(attributes, "test", resolveConflicts=true)`, + want: func(tCtx ottllog.TransformContext) { + m := pcommon.NewMap() + m.PutStr("test.http.method", "get") + m.PutStr("test.http.path", "/health") + m.PutStr("test.http.url", "http://localhost/health") + m.PutStr("test.flags", "A|B|C") + m.PutStr("test.total.string", "123456789") + m.PutStr("test.foo.bar", "pass") + m.PutStr("test.foo.flags", "pass") + m.PutStr("test.foo.bar", "pass") + m.PutStr("test.foo.flags", "pass") + m.PutStr("test.foo.slice", "val") + m.PutStr("test.foo.nested.test", "pass") + + m.PutStr("test.conflict.conflict1.conflict2", "pass") + m.PutStr("test.conflict.conflict1.conflict2.0", "nopass") + + m.PutStr("test.things.0.name", "foo") + m.PutInt("test.things.0.value", 2) + + m.PutStr("test.things.1.name", "bar") + m.PutInt("test.things.1.value", 5) + + m.CopyTo(tCtx.GetLogRecord().Attributes()) + }, + }, { statement: `flatten(attributes, depth=1)`, want: func(tCtx ottllog.TransformContext) { @@ -115,6 +149,9 @@ func Test_e2e_editors(t *testing.T) { m.PutStr("foo.bar", "pass") m.PutStr("foo.flags", "pass") m.PutEmptySlice("foo.slice").AppendEmpty().SetStr("val") + m.PutStr("conflict.conflict1.conflict2", "nopass") + mm := m.PutEmptyMap("conflict.conflict1") + mm.PutStr("conflict2", "pass") m1 := m.PutEmptyMap("things.0") m1.PutStr("name", "foo") @@ -137,6 +174,8 @@ func Test_e2e_editors(t *testing.T) { tCtx.GetLogRecord().Attributes().Remove("http.url") tCtx.GetLogRecord().Attributes().Remove("foo") tCtx.GetLogRecord().Attributes().Remove("things") + tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1") + tCtx.GetLogRecord().Attributes().Remove("conflict") }, }, { @@ -152,6 +191,8 @@ func Test_e2e_editors(t *testing.T) { tCtx.GetLogRecord().Attributes().Remove("flags") tCtx.GetLogRecord().Attributes().Remove("foo") tCtx.GetLogRecord().Attributes().Remove("things") + tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1") + tCtx.GetLogRecord().Attributes().Remove("conflict") }, }, { @@ -1287,6 +1328,11 @@ func constructLogTransformContextEditors() ottllog.TransformContext { logRecord.Attributes().PutStr("http.url", "http://localhost/health") logRecord.Attributes().PutStr("flags", "A|B|C") logRecord.Attributes().PutStr("total.string", "123456789") + mm := logRecord.Attributes().PutEmptyMap("conflict") + mm1 := mm.PutEmptyMap("conflict1") + mm1.PutStr("conflict2", "pass") + mmm := logRecord.Attributes().PutEmptyMap("conflict.conflict1") + mmm.PutStr("conflict2", "nopass") m := logRecord.Attributes().PutEmptyMap("foo") m.PutStr("bar", "pass") m.PutStr("flags", "pass") diff --git a/pkg/ottl/ottlfuncs/README.md b/pkg/ottl/ottlfuncs/README.md index ac86e36fc297..4531a86c3e8c 100644 --- a/pkg/ottl/ottlfuncs/README.md +++ b/pkg/ottl/ottlfuncs/README.md @@ -126,11 +126,12 @@ Examples: ### flatten -`flatten(target, Optional[prefix], Optional[depth])` +`flatten(target, Optional[prefix], Optional[depth], Optional[resolveConflicts])` The `flatten` function flattens a `pcommon.Map` by moving items from nested maps to the root. -`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int. +`target` is a path expression to a `pcommon.Map` type field. `prefix` is an optional string. `depth` is an optional non-negative int, `resolveConflicts` resolves the potential conflicts in the map keys by adding a number suffix starting with `0` from the first duplicated key. + For example, the following map @@ -199,6 +200,42 @@ the result would be A `depth` of `0` means that no flattening will occur. +If `resolveConflicts` is set to `true`, conflicts within the map will be resolved + +```json +{ + "address": { + "street": { + "number": "first", + }, + "house": "1234", + }, + "address.street": { + "number": ["second", "third"], + }, + "address.street.number": "fourth", + "occupants": [ + "user 1", + "user 2", + ], +} +``` + +the result would be + +```json +{ + "address.street.number": "first", + "address.house": "1234", + "address.street.number.0": "second", + "address.street.number.1": "third", + "occupants": "user 1", + "occupants.0": "user 2", + "address.street.number.2": "fourth", +} + +``` + Examples: - `flatten(attributes)` @@ -210,6 +247,9 @@ Examples: - `flatten(body, depth=2)` +- `flatten(body, resolveConflicts=true)` + + ### keep_keys `keep_keys(target, keys[])` diff --git a/pkg/ottl/ottlfuncs/func_flatten.go b/pkg/ottl/ottlfuncs/func_flatten.go index b651cce4ce5f..fce1ab3594a9 100644 --- a/pkg/ottl/ottlfuncs/func_flatten.go +++ b/pkg/ottl/ottlfuncs/func_flatten.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "math" + "strconv" "go.opentelemetry.io/collector/pdata/pcommon" @@ -14,9 +15,10 @@ import ( ) type FlattenArguments[K any] struct { - Target ottl.PMapGetter[K] - Prefix ottl.Optional[string] - Depth ottl.Optional[int64] + Target ottl.PMapGetter[K] + Prefix ottl.Optional[string] + Depth ottl.Optional[int64] + ResolveConflicts ottl.Optional[bool] } func NewFlattenFactory[K any]() ottl.Factory[K] { @@ -30,10 +32,10 @@ func createFlattenFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments) return nil, fmt.Errorf("FlattenFactory args must be of type *FlattenArguments[K]") } - return flatten(args.Target, args.Prefix, args.Depth) + return flatten(args.Target, args.Prefix, args.Depth, args.ResolveConflicts) } -func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64]) (ottl.ExprFunc[K], error) { +func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.Optional[int64], c ottl.Optional[bool]) (ottl.ExprFunc[K], error) { depth := int64(math.MaxInt64) if !d.IsEmpty() { depth = d.Get() @@ -47,6 +49,11 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O prefix = p.Get() } + conflict := false + if !c.IsEmpty() { + conflict = c.Get() + } + return func(ctx context.Context, tCtx K) (any, error) { m, err := target.Get(ctx, tCtx) if err != nil { @@ -54,45 +61,67 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O } result := pcommon.NewMap() - flattenMap(m, result, prefix, 0, depth) + existingKeys := map[string]int{} + flattenMap(m, result, prefix, 0, depth, conflict, existingKeys) result.MoveTo(m) return nil, nil }, nil } -func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64) { +func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64, conflict bool, existingKeys map[string]int) { if len(prefix) > 0 { prefix += "." } m.Range(func(k string, v pcommon.Value) bool { - return flattenValue(k, v, currentDepth, maxDepth, result, prefix) + return flattenValue(k, v, currentDepth, maxDepth, result, prefix, conflict, existingKeys) }) } -func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64) { +func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64, conflict bool, existingKeys map[string]int) { for i := 0; i < s.Len(); i++ { - flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix) + flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix, conflict, existingKeys) } } -func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string) bool { +func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string, conflict bool, existingKeys map[string]int) bool { switch { case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth: - flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth) + flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth, conflict, existingKeys) case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth: for i := 0; i < v.Slice().Len(); i++ { switch { case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth: - flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth) + flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys) case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth: - flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth) + flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys) default: - v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", prefix+k, i))) + key := prefix + k + if conflict { + handleConflict(existingKeys, key, v.Slice().At(i), &result) + } else { + v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", key, i))) + } } } default: - v.CopyTo(result.PutEmpty(prefix + k)) + key := prefix + k + if conflict { + handleConflict(existingKeys, key, v, &result) + } else { + v.CopyTo(result.PutEmpty(key)) + } } return true } + +func handleConflict(existingKeys map[string]int, key string, v pcommon.Value, result *pcommon.Map) { + if _, exists := result.Get(key); exists { + newKey := key + "." + strconv.Itoa(existingKeys[key]) + existingKeys[key]++ + v.CopyTo(result.PutEmpty(newKey)) + } else { + existingKeys[key] = 0 + v.CopyTo(result.PutEmpty(key)) + } +} diff --git a/pkg/ottl/ottlfuncs/func_flatten_test.go b/pkg/ottl/ottlfuncs/func_flatten_test.go index bbd2715e18a3..8dc8eb3e9d7e 100644 --- a/pkg/ottl/ottlfuncs/func_flatten_test.go +++ b/pkg/ottl/ottlfuncs/func_flatten_test.go @@ -5,6 +5,7 @@ package ottlfuncs import ( "context" + "reflect" "testing" "github.com/stretchr/testify/assert" @@ -20,6 +21,7 @@ func Test_flatten(t *testing.T) { prefix ottl.Optional[string] depth ottl.Optional[int64] expected map[string]any + conflict bool }{ { name: "simple", @@ -202,6 +204,136 @@ func Test_flatten(t *testing.T) { }, }, }, + { + name: "simple - conflict on", + target: map[string]any{ + "name": "test", + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "name": "test", + }, + conflict: true, + }, + { + name: "nested map - conflict on", + target: map[string]any{ + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "address.street": "first", + "address.house": int64(1234), + }, + conflict: true, + }, + { + name: "nested slice - conflict on", + target: map[string]any{ + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "occupants": "user 1", + "occupants.0": "user 2", + }, + conflict: true, + }, + { + name: "combination - conflict on", + target: map[string]any{ + "name": "test", + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "name": "test", + "address.street": "first", + "address.house": int64(1234), + "occupants": "user 1", + "occupants.0": "user 2", + }, + conflict: true, + }, + { + name: "deep nesting - conflict on", + target: map[string]any{ + "1": map[string]any{ + "2": map[string]any{ + "3": map[string]any{ + "4": "5", + }, + }, + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "1.2.3.4": "5", + }, + conflict: true, + }, + { + name: "use prefix - conflict on", + target: map[string]any{ + "name": "test", + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.NewTestingOptional[string]("app"), + depth: ottl.Optional[int64]{}, + expected: map[string]any{ + "app.name": "test", + "app.address.street": "first", + "app.address.house": int64(1234), + "app.occupants": "user 1", + "app.occupants.0": "user 2", + }, + conflict: true, + }, + { + name: "max depth - conflict on", + target: map[string]any{ + "0": map[string]any{ + "1": map[string]any{ + "2": map[string]any{ + "3": "value", + }, + }, + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.NewTestingOptional[int64](2), + expected: map[string]any{ + "0.1.2": map[string]any{ + "3": "value", + }, + }, + conflict: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -214,7 +346,7 @@ func Test_flatten(t *testing.T) { }, } - exprFunc, err := flatten[any](target, tt.prefix, tt.depth) + exprFunc, err := flatten[any](target, tt.prefix, tt.depth, ottl.NewTestingOptional[bool](tt.conflict)) assert.NoError(t, err) _, err = exprFunc(nil, nil) assert.NoError(t, err) @@ -224,13 +356,166 @@ func Test_flatten(t *testing.T) { } } +func Test_flatten_undeterministic(t *testing.T) { + tests := []struct { + name string + target map[string]any + prefix ottl.Optional[string] + depth ottl.Optional[int64] + expectedKeys []string + expectedValues []any + conflict bool + }{ + { + name: "conflicting map - conflict on", + target: map[string]any{ + "address": map[string]any{ + "street": map[string]any{ + "house": int64(1234), + }, + }, + "address.street": map[string]any{ + "house": int64(1235), + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expectedKeys: []string{ + "address.street.house", + "address.street.house.0", + }, + expectedValues: []any{ + int64(1234), + int64(1235), + }, + conflict: true, + }, + { + name: "conflicting slice - conflict on", + target: map[string]any{ + "address": map[string]any{ + "street": []any{"first"}, + "house": int64(1234), + }, + "address.street": []any{"second"}, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expectedKeys: []string{ + "address.street", + "address.house", + "address.street.0", + }, + expectedValues: []any{ + int64(1234), + "second", + "first", + }, + conflict: true, + }, + { + name: "conflicting map with nested slice - conflict on", + target: map[string]any{ + "address": map[string]any{ + "street": "first", + "house": int64(1234), + }, + "address.street": "second", + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expectedKeys: []string{ + "address.street", + "address.house", + "address.street.0", + "occupants", + "occupants.0", + }, + expectedValues: []any{ + int64(1234), + "second", + "first", + "user 1", + "user 2", + }, + conflict: true, + }, + { + name: "conflicting map with nested slice in conflicting item - conflict on", + target: map[string]any{ + "address": map[string]any{ + "street": map[string]any{ + "number": "first", + }, + "house": int64(1234), + }, + "address.street": map[string]any{ + "number": []any{"second", "third"}, + }, + "address.street.number": "fourth", + "occupants": []any{ + "user 1", + "user 2", + }, + }, + prefix: ottl.Optional[string]{}, + depth: ottl.Optional[int64]{}, + expectedKeys: []string{ + "address.street.number", + "address.house", + "address.street.number.0", + "address.street.number.1", + "occupants", + "occupants.0", + "address.street.number.2", + }, + expectedValues: []any{ + int64(1234), + "second", + "first", + "third", + "fourth", + "user 1", + "user 2", + }, + conflict: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := pcommon.NewMap() + err := m.FromRaw(tt.target) + assert.NoError(t, err) + target := ottl.StandardPMapGetter[any]{ + Getter: func(_ context.Context, _ any) (any, error) { + return m, nil + }, + } + + exprFunc, err := flatten[any](target, tt.prefix, tt.depth, ottl.NewTestingOptional[bool](tt.conflict)) + assert.NoError(t, err) + _, err = exprFunc(nil, nil) + assert.NoError(t, err) + + keys, val := extractKeysAndValues(m.AsRaw()) + + assert.True(t, compareSlices(keys, tt.expectedKeys)) + assert.True(t, compareSlices(val, tt.expectedValues)) + }) + } +} + func Test_flatten_bad_target(t *testing.T) { target := &ottl.StandardPMapGetter[any]{ Getter: func(_ context.Context, _ any) (any, error) { return 1, nil }, } - exprFunc, err := flatten[any](target, ottl.Optional[string]{}, ottl.Optional[int64]{}) + exprFunc, err := flatten[any](target, ottl.Optional[string]{}, ottl.Optional[int64]{}, ottl.NewTestingOptional[bool](false)) assert.NoError(t, err) _, err = exprFunc(nil, nil) assert.Error(t, err) @@ -258,8 +543,37 @@ func Test_flatten_bad_depth(t *testing.T) { return pcommon.NewMap(), nil }, } - _, err := flatten[any](target, ottl.Optional[string]{}, tt.depth) + _, err := flatten[any](target, ottl.Optional[string]{}, tt.depth, ottl.NewTestingOptional[bool](false)) assert.Error(t, err) }) } } + +func extractKeysAndValues(m map[string]any) ([]string, []any) { + keys := make([]string, 0, len(m)) + values := make([]any, 0, len(m)) + for key, value := range m { + keys = append(keys, key) + values = append(values, value) + } + return keys, values +} + +func compareSlices[K string | any](a, b []K) bool { + if len(a) != len(b) { + return false + } + + aMap := make(map[any]int) + bMap := make(map[any]int) + + for _, item := range a { + aMap[item]++ + } + + for _, item := range b { + bMap[item]++ + } + + return reflect.DeepEqual(aMap, bMap) +} From d443535c8b3218bf3502eecd13bf55694c19c7fb Mon Sep 17 00:00:00 2001 From: odubajDT Date: Fri, 10 Jan 2025 07:25:50 +0100 Subject: [PATCH 2/2] refactor implementation Signed-off-by: odubajDT --- pkg/ottl/ottlfuncs/func_flatten.go | 77 ++++++++++++++++++------------ 1 file changed, 46 insertions(+), 31 deletions(-) diff --git a/pkg/ottl/ottlfuncs/func_flatten.go b/pkg/ottl/ottlfuncs/func_flatten.go index fce1ab3594a9..62077441ce88 100644 --- a/pkg/ottl/ottlfuncs/func_flatten.go +++ b/pkg/ottl/ottlfuncs/func_flatten.go @@ -21,6 +21,13 @@ type FlattenArguments[K any] struct { ResolveConflicts ottl.Optional[bool] } +type flattenData struct { + result pcommon.Map + existingKeys map[string]int + resolveConflict bool + maxDepth int64 +} + func NewFlattenFactory[K any]() ottl.Factory[K] { return ottl.NewFactory("flatten", &FlattenArguments[K]{}, createFlattenFunction[K]) } @@ -49,9 +56,9 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O prefix = p.Get() } - conflict := false + resolveConflict := false if !c.IsEmpty() { - conflict = c.Get() + resolveConflict = c.Get() } return func(ctx context.Context, tCtx K) (any, error) { @@ -60,68 +67,76 @@ func flatten[K any](target ottl.PMapGetter[K], p ottl.Optional[string], d ottl.O return nil, err } - result := pcommon.NewMap() - existingKeys := map[string]int{} - flattenMap(m, result, prefix, 0, depth, conflict, existingKeys) - result.MoveTo(m) + flattenData := initFlattenData(resolveConflict, depth) + flattenData.flattenMap(m, prefix, 0) + flattenData.result.MoveTo(m) return nil, nil }, nil } -func flattenMap(m pcommon.Map, result pcommon.Map, prefix string, currentDepth, maxDepth int64, conflict bool, existingKeys map[string]int) { +func initFlattenData(resolveConflict bool, maxDepth int64) *flattenData { + return &flattenData{ + result: pcommon.NewMap(), + existingKeys: map[string]int{}, + resolveConflict: resolveConflict, + maxDepth: maxDepth, + } +} + +func (f *flattenData) flattenMap(m pcommon.Map, prefix string, currentDepth int64) { if len(prefix) > 0 { prefix += "." } m.Range(func(k string, v pcommon.Value) bool { - return flattenValue(k, v, currentDepth, maxDepth, result, prefix, conflict, existingKeys) + return f.flattenValue(k, v, currentDepth, prefix) }) } -func flattenSlice(s pcommon.Slice, result pcommon.Map, prefix string, currentDepth int64, maxDepth int64, conflict bool, existingKeys map[string]int) { +func (f *flattenData) flattenSlice(s pcommon.Slice, prefix string, currentDepth int64) { for i := 0; i < s.Len(); i++ { - flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, maxDepth, result, prefix, conflict, existingKeys) + f.flattenValue(fmt.Sprintf("%d", i), s.At(i), currentDepth+1, prefix) } } -func flattenValue(k string, v pcommon.Value, currentDepth int64, maxDepth int64, result pcommon.Map, prefix string, conflict bool, existingKeys map[string]int) bool { +func (f *flattenData) flattenValue(k string, v pcommon.Value, currentDepth int64, prefix string) bool { switch { - case v.Type() == pcommon.ValueTypeMap && currentDepth < maxDepth: - flattenMap(v.Map(), result, prefix+k, currentDepth+1, maxDepth, conflict, existingKeys) - case v.Type() == pcommon.ValueTypeSlice && currentDepth < maxDepth: + case v.Type() == pcommon.ValueTypeMap && currentDepth < f.maxDepth: + f.flattenMap(v.Map(), prefix+k, currentDepth+1) + case v.Type() == pcommon.ValueTypeSlice && currentDepth < f.maxDepth: for i := 0; i < v.Slice().Len(); i++ { switch { - case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < maxDepth: - flattenMap(v.Slice().At(i).Map(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys) - case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < maxDepth: - flattenSlice(v.Slice().At(i).Slice(), result, fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2, maxDepth, conflict, existingKeys) + case v.Slice().At(i).Type() == pcommon.ValueTypeMap && currentDepth+1 < f.maxDepth: + f.flattenMap(v.Slice().At(i).Map(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2) + case v.Slice().At(i).Type() == pcommon.ValueTypeSlice && currentDepth+1 < f.maxDepth: + f.flattenSlice(v.Slice().At(i).Slice(), fmt.Sprintf("%v.%v", prefix+k, i), currentDepth+2) default: key := prefix + k - if conflict { - handleConflict(existingKeys, key, v.Slice().At(i), &result) + if f.resolveConflict { + f.handleConflict(key, v.Slice().At(i)) } else { - v.Slice().At(i).CopyTo(result.PutEmpty(fmt.Sprintf("%v.%v", key, i))) + v.Slice().At(i).CopyTo(f.result.PutEmpty(fmt.Sprintf("%v.%v", key, i))) } } } default: key := prefix + k - if conflict { - handleConflict(existingKeys, key, v, &result) + if f.resolveConflict { + f.handleConflict(key, v) } else { - v.CopyTo(result.PutEmpty(key)) + v.CopyTo(f.result.PutEmpty(key)) } } return true } -func handleConflict(existingKeys map[string]int, key string, v pcommon.Value, result *pcommon.Map) { - if _, exists := result.Get(key); exists { - newKey := key + "." + strconv.Itoa(existingKeys[key]) - existingKeys[key]++ - v.CopyTo(result.PutEmpty(newKey)) +func (f *flattenData) handleConflict(key string, v pcommon.Value) { + if _, exists := f.result.Get(key); exists { + newKey := key + "." + strconv.Itoa(f.existingKeys[key]) + f.existingKeys[key]++ + v.CopyTo(f.result.PutEmpty(newKey)) } else { - existingKeys[key] = 0 - v.CopyTo(result.PutEmpty(key)) + f.existingKeys[key] = 0 + v.CopyTo(f.result.PutEmpty(key)) } }