From 1c3b656a372794f15491593bcadc62dc2e3da945 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 09:07:46 -0300 Subject: [PATCH 01/13] Add support for setting a document id --- exporter/elasticsearchexporter/README.md | 5 + exporter/elasticsearchexporter/bulkindexer.go | 15 ++- .../elasticsearchexporter/bulkindexer_test.go | 8 +- exporter/elasticsearchexporter/config.go | 3 + exporter/elasticsearchexporter/config_test.go | 9 ++ exporter/elasticsearchexporter/exporter.go | 26 ++++- .../elasticsearchexporter/exporter_test.go | 100 ++++++++++++++++++ exporter/elasticsearchexporter/factory.go | 3 + 8 files changed, 157 insertions(+), 12 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 13ecfa53507d..b418e37e561c 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -145,6 +145,11 @@ This can be customised through the following settings: - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. +- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. + +TODO(mauri870): Add metrics and traces dynamic ID support. + ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 2200216be4ef..bfd3b9c25134 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error + Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,8 +126,12 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { - err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { + doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates} + if docID != nil { + doc.DocumentID = *docID + } + err := s.bi.Add(doc) if err != nil { return err } @@ -248,12 +252,15 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DynamicTemplates: dynamicTemplates, } + if docID != nil { + item.DocumentID = *docID + } select { case <-ctx.Done(): return ctx.Err() diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 2b3d86a30128..850fffe48cd7 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 0835396d928f..198b03ac57a8 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -61,6 +61,9 @@ type Config struct { // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` + // LogsDynamicID is used to configure the document id for logs. + LogsDynamicID DynamicIndexSetting `mapstructure:"logs_dynamic_id"` + // Pipeline configures the ingest node pipeline name that should be used to process the // events. // diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index b83beb3e91ba..310a5501887d 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -74,6 +74,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -146,6 +149,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -218,6 +224,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2..47d1efe33b38 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -23,6 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) +const ( + // documentIDAttributeName is the attribute name used to specify the document ID. + documentIDAttributeName = "elasticsearch.document_id" +) + type elasticsearchExporter struct { component.TelemetrySettings userAgent string @@ -177,7 +182,9 @@ func (e *elasticsearchExporter) pushLogRecord( if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + + docID := e.getDocumentIDAttribute(record.Attributes(), scope.Attributes(), resource.Attributes()) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) } func (e *elasticsearchExporter) pushMetricsData( @@ -300,7 +307,8 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { + + if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates(), nil); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -415,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -449,5 +457,15 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) + return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) +} + +func (e *elasticsearchExporter) getDocumentIDAttribute(attributeMaps ...pcommon.Map) *string { + if e.config.LogsDynamicID.Enabled { + docID, ok := getFromAttributes(documentIDAttributeName, "", attributeMaps...) + if docID != "" && ok { + return &docID + } + } + return nil } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 5554c089e02b..6bb2cf839dc2 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -734,6 +734,95 @@ func TestExporterLogs(t *testing.T) { assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("publish logs with dynamic id", func(t *testing.T) { + exampleDocID := "abc123" + tableTests := []struct { + name string + expectedDocID *string // nil means the _id will not be set + recordAttrs map[string]any + scopeAttrs map[string]any + resourceAttrs map[string]any + }{ + { + name: "missing document id attribute should not set _id", + expectedDocID: nil, + }, + { + name: "record attributes", + expectedDocID: &exampleDocID, + recordAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "scope attributes", + expectedDocID: &exampleDocID, + scopeAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "resource attributes", + expectedDocID: &exampleDocID, + resourceAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + { + name: "record attributes takes precedence over others", + expectedDocID: &exampleDocID, + recordAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + scopeAttrs: map[string]any{ + documentIDAttributeName: "id1", + }, + resourceAttrs: map[string]any{ + documentIDAttributeName: "id2", + }, + }, + { + name: "scope attributes takes precedence over resource attributes", + expectedDocID: &exampleDocID, + scopeAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + resourceAttrs: map[string]any{ + documentIDAttributeName: "id1", + }, + }, + } + + for _, tt := range tableTests { + t.Run(tt.name, func(t *testing.T) { + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedDocID == nil { + assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") + } else { + assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + } + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicID.Enabled = true + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + tt.scopeAttrs, + tt.resourceAttrs, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + } + }) } func TestExporterMetrics(t *testing.T) { @@ -1909,3 +1998,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { require.NoError(t, err) return action.Create.Index } + +func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string { + action := struct { + Create struct { + ID string `json:"_id"` + } `json:"create"` + }{} + err := json.Unmarshal(actionJSON, &action) + require.NoError(t, err) + return action.Create.ID +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 4783d430196a..5ce0116d14f2 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,6 +63,9 @@ func createDefaultConfig() component.Config { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIndexSetting{ + Enabled: false, + }, Retry: RetrySettings{ Enabled: true, MaxRetries: 0, // default is set in exporter code From 7128be6341fbe629794650d4f7c570bbf801b05d Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 09:52:10 -0300 Subject: [PATCH 02/13] run tests in parallel, test with sync and async bulk indexer --- .../elasticsearchexporter/exporter_test.go | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 6bb2cf839dc2..9612a279c965 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -736,6 +736,7 @@ func TestExporterLogs(t *testing.T) { }) t.Run("publish logs with dynamic id", func(t *testing.T) { + t.Parallel() exampleDocID := "abc123" tableTests := []struct { name string @@ -794,33 +795,48 @@ func TestExporterLogs(t *testing.T) { }, } + cfgs := map[string]func(*Config){ + "sync": func(cfg *Config) { + batcherEnabled := false + cfg.Batcher.Enabled = &batcherEnabled + }, + "async": func(cfg *Config) { + batcherEnabled := true + cfg.Batcher.Enabled = &batcherEnabled + cfg.Batcher.FlushTimeout = 10 * time.Millisecond + }, + } for _, tt := range tableTests { - t.Run(tt.name, func(t *testing.T) { - rec := newBulkRecorder() - server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { - rec.Record(docs) - - if tt.expectedDocID == nil { - assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") - } else { - assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") - } - return itemsAllOK(docs) - }) + for cfgName, cfgFn := range cfgs { + t.Run(tt.name+"/"+cfgName, func(t *testing.T) { + t.Parallel() + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedDocID == nil { + assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") + } else { + assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + } + return itemsAllOK(docs) + }) - exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { - cfg.LogsDynamicID.Enabled = true + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.LogsDynamicID.Enabled = true + cfgFn(cfg) + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + tt.scopeAttrs, + tt.resourceAttrs, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) }) - logs := newLogsWithAttributes( - tt.recordAttrs, - tt.scopeAttrs, - tt.resourceAttrs, - ) - logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") - mustSendLogs(t, exporter, logs) - - rec.WaitItems(1) - }) + } } }) } From c277b557f9a5154da048639cef1af3663812af81 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 10:41:27 -0300 Subject: [PATCH 03/13] add changelog entry --- ...elasticsearchexporter_logs_dynamic_id.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/elasticsearchexporter_logs_dynamic_id.yaml diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml new file mode 100644 index 000000000000..a846bbcc1445 --- /dev/null +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support for complex attributes for log records in OTel mode + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36882] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From 89f19d399fdd7cc1b641d17e3eb126b253a5d9d0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 7 Jan 2025 10:44:50 -0300 Subject: [PATCH 04/13] remove todo for metrics and traces --- exporter/elasticsearchexporter/README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index b418e37e561c..986171f40d27 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -148,8 +148,6 @@ This can be customised through the following settings: - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. -TODO(mauri870): Add metrics and traces dynamic ID support. - ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing From dc2505745485323b3f9a9b3021ed415f4d267aea Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 8 Jan 2025 09:09:44 -0300 Subject: [PATCH 05/13] only look into record attributes --- exporter/elasticsearchexporter/README.md | 4 +- exporter/elasticsearchexporter/exporter.go | 6 +-- .../elasticsearchexporter/exporter_test.go | 43 +------------------ 3 files changed, 7 insertions(+), 46 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 986171f40d27..fb60c308ffba 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -145,8 +145,8 @@ This can be customised through the following settings: - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. -- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on resource, scope, or log record attributes. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in attributes (precedence: log record attribute > scope attribute > resource attribute), it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. +- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. ### Elasticsearch document mapping diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 47d1efe33b38..6ca7c465b880 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -183,7 +183,7 @@ func (e *elasticsearchExporter) pushLogRecord( return fmt.Errorf("failed to encode log event: %w", err) } - docID := e.getDocumentIDAttribute(record.Attributes(), scope.Attributes(), resource.Attributes()) + docID := e.getDocumentIDAttribute(record.Attributes()) return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) } @@ -460,9 +460,9 @@ func (e *elasticsearchExporter) pushSpanEvent( return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(attributeMaps ...pcommon.Map) *string { +func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) *string { if e.config.LogsDynamicID.Enabled { - docID, ok := getFromAttributes(documentIDAttributeName, "", attributeMaps...) + docID, ok := getFromAttributes(documentIDAttributeName, "", m) if docID != "" && ok { return &docID } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 9612a279c965..86e62a6364b7 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -742,8 +742,6 @@ func TestExporterLogs(t *testing.T) { name string expectedDocID *string // nil means the _id will not be set recordAttrs map[string]any - scopeAttrs map[string]any - resourceAttrs map[string]any }{ { name: "missing document id attribute should not set _id", @@ -756,43 +754,6 @@ func TestExporterLogs(t *testing.T) { documentIDAttributeName: exampleDocID, }, }, - { - name: "scope attributes", - expectedDocID: &exampleDocID, - scopeAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - }, - { - name: "resource attributes", - expectedDocID: &exampleDocID, - resourceAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - }, - { - name: "record attributes takes precedence over others", - expectedDocID: &exampleDocID, - recordAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - scopeAttrs: map[string]any{ - documentIDAttributeName: "id1", - }, - resourceAttrs: map[string]any{ - documentIDAttributeName: "id2", - }, - }, - { - name: "scope attributes takes precedence over resource attributes", - expectedDocID: &exampleDocID, - scopeAttrs: map[string]any{ - documentIDAttributeName: exampleDocID, - }, - resourceAttrs: map[string]any{ - documentIDAttributeName: "id1", - }, - }, } cfgs := map[string]func(*Config){ @@ -828,8 +789,8 @@ func TestExporterLogs(t *testing.T) { }) logs := newLogsWithAttributes( tt.recordAttrs, - tt.scopeAttrs, - tt.resourceAttrs, + map[string]any{}, + map[string]any{}, ) logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") mustSendLogs(t, exporter, logs) From 7f2557c15c13a5e5144b1bb09e54952e18f911d0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 08:35:39 -0300 Subject: [PATCH 06/13] fixes from code review --- ...elasticsearchexporter_logs_dynamic_id.yaml | 2 +- exporter/elasticsearchexporter/bulkindexer.go | 14 ++++++------- .../elasticsearchexporter/bulkindexer_test.go | 8 +++---- exporter/elasticsearchexporter/exporter.go | 14 ++++++------- .../elasticsearchexporter/exporter_test.go | 21 ++++++++++++------- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml index a846bbcc1445..9acda46d18f8 100644 --- a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Support for complex attributes for log records in OTel mode +note: Support for dynamically setting the document ID of log records. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [36882] diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index bfd3b9c25134..49953e06ddff 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error + Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,10 +126,10 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates} - if docID != nil { - doc.DocumentID = *docID + if docID != "" { + doc.DocumentID = docID } err := s.bi.Add(doc) if err != nil { @@ -252,14 +252,14 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string, docID *string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DynamicTemplates: dynamicTemplates, } - if docID != nil { - item.DocumentID = *docID + if docID != "" { + item.DocumentID = docID } select { case <-ctx.Done(): diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 850fffe48cd7..9f2139e83710 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil, nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 6ca7c465b880..eb59cafbea79 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -184,7 +184,7 @@ func (e *elasticsearchExporter) pushLogRecord( } docID := e.getDocumentIDAttribute(record.Attributes()) - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, docID) + return bulkIndexerSession.Add(ctx, fIndex, docID, bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -308,7 +308,7 @@ func (e *elasticsearchExporter) pushMetricsData( continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates(), nil); err != nil { + if err := session.Add(ctx, fIndex, "", bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -423,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil, nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -457,15 +457,15 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil, nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) *string { +func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) string { if e.config.LogsDynamicID.Enabled { docID, ok := getFromAttributes(documentIDAttributeName, "", m) if docID != "" && ok { - return &docID + return docID } } - return nil + return "" } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 86e62a6364b7..7f4f4f2a659b 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -740,16 +740,23 @@ func TestExporterLogs(t *testing.T) { exampleDocID := "abc123" tableTests := []struct { name string - expectedDocID *string // nil means the _id will not be set + expectedDocID string // "" means the _id will not be set recordAttrs map[string]any }{ { name: "missing document id attribute should not set _id", - expectedDocID: nil, + expectedDocID: "", + }, + { + name: "empty document id attribute should not set _id", + expectedDocID: "", + recordAttrs: map[string]any{ + documentIDAttributeName: "", + }, }, { name: "record attributes", - expectedDocID: &exampleDocID, + expectedDocID: exampleDocID, recordAttrs: map[string]any{ documentIDAttributeName: exampleDocID, }, @@ -757,11 +764,11 @@ func TestExporterLogs(t *testing.T) { } cfgs := map[string]func(*Config){ - "sync": func(cfg *Config) { + "async": func(cfg *Config) { batcherEnabled := false cfg.Batcher.Enabled = &batcherEnabled }, - "async": func(cfg *Config) { + "sync": func(cfg *Config) { batcherEnabled := true cfg.Batcher.Enabled = &batcherEnabled cfg.Batcher.FlushTimeout = 10 * time.Millisecond @@ -775,10 +782,10 @@ func TestExporterLogs(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { rec.Record(docs) - if tt.expectedDocID == nil { + if tt.expectedDocID == "" { assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") } else { - assert.Equal(t, *tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") } return itemsAllOK(docs) }) From 108079071e6cef13b4b4d3eb9d526852d017c2b9 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 08:38:13 -0300 Subject: [PATCH 07/13] clarify about empty string value --- exporter/elasticsearchexporter/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index fb60c308ffba..b77844edeb0a 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -146,7 +146,7 @@ This can be customised through the following settings: - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is != "" in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. ### Elasticsearch document mapping From 9e0ad2f4c090d5b971ecb4d069f01da14ffd99a0 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 9 Jan 2025 15:17:09 -0300 Subject: [PATCH 08/13] remove id attribute from the final document --- exporter/elasticsearchexporter/exporter.go | 5 +++-- exporter/elasticsearchexporter/exporter_test.go | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index eb59cafbea79..172f088cf650 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -178,12 +178,12 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } + docID := e.extractDocumentIDAttribute(record.Attributes()) document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - docID := e.getDocumentIDAttribute(record.Attributes()) return bulkIndexerSession.Add(ctx, fIndex, docID, bytes.NewReader(document), nil) } @@ -460,9 +460,10 @@ func (e *elasticsearchExporter) pushSpanEvent( return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) } -func (e *elasticsearchExporter) getDocumentIDAttribute(m pcommon.Map) string { +func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { if e.config.LogsDynamicID.Enabled { docID, ok := getFromAttributes(documentIDAttributeName, "", m) + m.Remove(documentIDAttributeName) if docID != "" && ok { return docID } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 7f4f4f2a659b..a0a838785aea 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -787,10 +787,14 @@ func TestExporterLogs(t *testing.T) { } else { assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") } + + // Ensure the document id attribute is removed from the final document. + assert.NotContains(t, docs[0].Document, documentIDAttributeName, "expected document id attribute to be removed") return itemsAllOK(docs) }) exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" cfg.LogsDynamicID.Enabled = true cfgFn(cfg) }) From b3810ddddfdca4e4404618f17d01a7468598eac2 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 10 Jan 2025 13:48:05 -0300 Subject: [PATCH 09/13] Apply suggestions from code review Co-authored-by: Carson Ip --- .chloggen/elasticsearchexporter_logs_dynamic_id.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml index 9acda46d18f8..84867eac2a07 100644 --- a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: elasticsearchexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Support for dynamically setting the document ID of log records. +note: Add config `logs_dynamic_id` to dynamically set the document ID of log records using log record attribute `elasticsearch.document_id` # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [36882] From 47be731bcff8c1a3e13ce26dd3909c889ab140b9 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 10 Jan 2025 13:52:47 -0300 Subject: [PATCH 10/13] mention that the document id is removed from the log record --- exporter/elasticsearchexporter/README.md | 2 +- exporter/elasticsearchexporter/config.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index b77844edeb0a..ca00c7dfefde 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -146,7 +146,7 @@ This can be customised through the following settings: - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. - `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. - - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is != "" in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document. ### Elasticsearch document mapping diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 198b03ac57a8..abdb5d6f2bab 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -62,7 +62,7 @@ type Config struct { TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` // LogsDynamicID is used to configure the document id for logs. - LogsDynamicID DynamicIndexSetting `mapstructure:"logs_dynamic_id"` + LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"` // Pipeline configures the ingest node pipeline name that should be used to process the // events. @@ -123,6 +123,10 @@ type DynamicIndexSetting struct { Enabled bool `mapstructure:"enabled"` } +type DynamicIDSettings struct { + Enabled bool `mapstructure:"enabled"` +} + // AuthenticationSettings defines user authentication related settings. type AuthenticationSettings struct { // User is used to configure HTTP Basic Authentication. From d1be5905e18eeb08e13309d528bd9cd97dd3bcc1 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 10 Jan 2025 14:00:30 -0300 Subject: [PATCH 11/13] Update exporter/elasticsearchexporter/config.go Co-authored-by: Carson Ip --- exporter/elasticsearchexporter/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index abdb5d6f2bab..caf5146f0ac3 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -61,7 +61,7 @@ type Config struct { // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` - // LogsDynamicID is used to configure the document id for logs. + // LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES. LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"` // Pipeline configures the ingest node pipeline name that should be used to process the From 08330dad603c421e8f4526c9382bcf170d206956 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 10 Jan 2025 14:03:15 -0300 Subject: [PATCH 12/13] fixes for the recent changes in config --- exporter/elasticsearchexporter/config_test.go | 6 +++--- exporter/elasticsearchexporter/factory.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 310a5501887d..71700158fa4f 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -74,7 +74,7 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - LogsDynamicID: DynamicIndexSetting{ + LogsDynamicID: DynamicIDSettings{ Enabled: false, }, Pipeline: "mypipeline", @@ -149,7 +149,7 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - LogsDynamicID: DynamicIndexSetting{ + LogsDynamicID: DynamicIDSettings{ Enabled: false, }, Pipeline: "mypipeline", @@ -224,7 +224,7 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - LogsDynamicID: DynamicIndexSetting{ + LogsDynamicID: DynamicIDSettings{ Enabled: false, }, Pipeline: "mypipeline", diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 5ce0116d14f2..3a73f19db86c 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,7 +63,7 @@ func createDefaultConfig() component.Config { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, - LogsDynamicID: DynamicIndexSetting{ + LogsDynamicID: DynamicIDSettings{ Enabled: false, }, Retry: RetrySettings{ From f60d701e8f9fa6c15afaf7b27fe67c2a9d7844c6 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 10 Jan 2025 14:06:04 -0300 Subject: [PATCH 13/13] Use RemoveIf --- exporter/elasticsearchexporter/exporter.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index 172f088cf650..08f9d43fe70d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -460,13 +460,16 @@ func (e *elasticsearchExporter) pushSpanEvent( return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) } -func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) string { - if e.config.LogsDynamicID.Enabled { - docID, ok := getFromAttributes(documentIDAttributeName, "", m) - m.Remove(documentIDAttributeName) - if docID != "" && ok { - return docID - } +func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) (docID string) { + if !e.config.LogsDynamicID.Enabled { + return } - return "" + m.RemoveIf(func(k string, value pcommon.Value) bool { + if k == documentIDAttributeName { + docID = value.AsString() + return true + } + return false + }) + return }