diff --git a/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml new file mode 100644 index 000000000000..84867eac2a07 --- /dev/null +++ b/.chloggen/elasticsearchexporter_logs_dynamic_id.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add config `logs_dynamic_id` to dynamically set the document ID of log records using log record attribute `elasticsearch.document_id` + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36882] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 13ecfa53507d..ca00c7dfefde 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -145,6 +145,9 @@ This can be customised through the following settings: - `prefix_separator`(default=`-`): Set a separator between logstash_prefix and date. - `date_format`(default=`%Y.%m.%d`): Time format (based on strftime) to generate the second part of the Index name. +- `logs_dynamic_id` (optional): Dynamically determines the document ID to be used in Elasticsearch based on a log record attribute. + - `enabled`(default=false): Enable/Disable dynamic ID for log records. If `elasticsearch.document_id` exists and is not an empty string in the log record attributes, it will be used as the document ID. Otherwise, the document ID will be generated by Elasticsearch. The attribute `elasticsearch.document_id` is removed from the final document. + ### Elasticsearch document mapping The Elasticsearch exporter supports several document schemas and preprocessing diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 2200216be4ef..49953e06ddff 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -31,7 +31,7 @@ type bulkIndexer interface { type bulkIndexerSession interface { // Add adds a document to the bulk indexing session. - Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error + Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error // End must be called on the session object once it is no longer // needed, in order to release any associated resources. @@ -126,8 +126,12 @@ type syncBulkIndexerSession struct { } // Add adds an item to the sync bulk indexer session. -func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { - err := s.bi.Add(docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates}) +func (s *syncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { + doc := docappender.BulkIndexerItem{Index: index, Body: document, DynamicTemplates: dynamicTemplates} + if docID != "" { + doc.DocumentID = docID + } + err := s.bi.Add(doc) if err != nil { return err } @@ -248,12 +252,15 @@ func (a *asyncBulkIndexer) Close(ctx context.Context) error { // Add adds an item to the async bulk indexer session. // // Adding an item after a call to Close() will panic. -func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, document io.WriterTo, dynamicTemplates map[string]string) error { +func (s asyncBulkIndexerSession) Add(ctx context.Context, index string, docID string, document io.WriterTo, dynamicTemplates map[string]string) error { item := docappender.BulkIndexerItem{ Index: index, Body: document, DynamicTemplates: dynamicTemplates, } + if docID != "" { + item.DocumentID = docID + } select { case <-ctx.Done(): return ctx.Err() diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 2b3d86a30128..9f2139e83710 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -102,7 +102,7 @@ func TestAsyncBulkIndexer_flush(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) @@ -229,7 +229,7 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) @@ -312,7 +312,7 @@ func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Clie session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.NoError(t, bulkIndexer.Close(context.Background())) return bulkIndexer @@ -338,7 +338,7 @@ func TestSyncBulkIndexer_flushBytes(t *testing.T) { session, err := bi.StartSession(context.Background()) require.NoError(t, err) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, session.Add(context.Background(), "foo", "", strings.NewReader(`{"foo": "bar"}`), nil)) assert.Equal(t, int64(1), reqCnt.Load()) // flush due to flush::bytes assert.NoError(t, bi.Close(context.Background())) } diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 0835396d928f..caf5146f0ac3 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -61,6 +61,9 @@ type Config struct { // fall back to pure TracesIndex, if 'elasticsearch.index.prefix' or 'elasticsearch.index.suffix' are not found in resource or attribute (prio: resource > attribute) TracesDynamicIndex DynamicIndexSetting `mapstructure:"traces_dynamic_index"` + // LogsDynamicID configures whether log record attribute `elasticsearch.document_id` is set as the document ID in ES. + LogsDynamicID DynamicIDSettings `mapstructure:"logs_dynamic_id"` + // Pipeline configures the ingest node pipeline name that should be used to process the // events. // @@ -120,6 +123,10 @@ type DynamicIndexSetting struct { Enabled bool `mapstructure:"enabled"` } +type DynamicIDSettings struct { + Enabled bool `mapstructure:"enabled"` +} + // AuthenticationSettings defines user authentication related settings. type AuthenticationSettings struct { // User is used to configure HTTP Basic Authentication. diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index b83beb3e91ba..71700158fa4f 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -74,6 +74,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIDSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -146,6 +149,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIDSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute @@ -218,6 +224,9 @@ func TestConfig(t *testing.T) { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIDSettings{ + Enabled: false, + }, Pipeline: "mypipeline", ClientConfig: withDefaultHTTPClientConfig(func(cfg *confighttp.ClientConfig) { cfg.Timeout = 2 * time.Minute diff --git a/exporter/elasticsearchexporter/exporter.go b/exporter/elasticsearchexporter/exporter.go index ebd3800858a2..08f9d43fe70d 100644 --- a/exporter/elasticsearchexporter/exporter.go +++ b/exporter/elasticsearchexporter/exporter.go @@ -23,6 +23,11 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/objmodel" ) +const ( + // documentIDAttributeName is the attribute name used to specify the document ID. + documentIDAttributeName = "elasticsearch.document_id" +) + type elasticsearchExporter struct { component.TelemetrySettings userAgent string @@ -173,11 +178,13 @@ func (e *elasticsearchExporter) pushLogRecord( fIndex = formattedIndex } + docID := e.extractDocumentIDAttribute(record.Attributes()) document, err := e.model.encodeLog(resource, resourceSchemaURL, record, scope, scopeSchemaURL) if err != nil { return fmt.Errorf("failed to encode log event: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + + return bulkIndexerSession.Add(ctx, fIndex, docID, bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushMetricsData( @@ -300,7 +307,8 @@ func (e *elasticsearchExporter) pushMetricsData( errs = append(errs, err) continue } - if err := session.Add(ctx, fIndex, bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { + + if err := session.Add(ctx, fIndex, "", bytes.NewReader(docBytes), doc.DynamicTemplates()); err != nil { if cerr := ctx.Err(); cerr != nil { return cerr } @@ -415,7 +423,7 @@ func (e *elasticsearchExporter) pushTraceRecord( if err != nil { return fmt.Errorf("failed to encode trace record: %w", err) } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(document), nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(document), nil) } func (e *elasticsearchExporter) pushSpanEvent( @@ -449,5 +457,19 @@ func (e *elasticsearchExporter) pushSpanEvent( if err != nil { return err } - return bulkIndexerSession.Add(ctx, fIndex, bytes.NewReader(docBytes), nil) + return bulkIndexerSession.Add(ctx, fIndex, "", bytes.NewReader(docBytes), nil) +} + +func (e *elasticsearchExporter) extractDocumentIDAttribute(m pcommon.Map) (docID string) { + if !e.config.LogsDynamicID.Enabled { + return + } + m.RemoveIf(func(k string, value pcommon.Value) bool { + if k == documentIDAttributeName { + docID = value.AsString() + return true + } + return false + }) + return } diff --git a/exporter/elasticsearchexporter/exporter_test.go b/exporter/elasticsearchexporter/exporter_test.go index 33a0cf6d1383..046134607dc0 100644 --- a/exporter/elasticsearchexporter/exporter_test.go +++ b/exporter/elasticsearchexporter/exporter_test.go @@ -734,6 +734,83 @@ func TestExporterLogs(t *testing.T) { assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw) assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw) }) + + t.Run("publish logs with dynamic id", func(t *testing.T) { + t.Parallel() + exampleDocID := "abc123" + tableTests := []struct { + name string + expectedDocID string // "" means the _id will not be set + recordAttrs map[string]any + }{ + { + name: "missing document id attribute should not set _id", + expectedDocID: "", + }, + { + name: "empty document id attribute should not set _id", + expectedDocID: "", + recordAttrs: map[string]any{ + documentIDAttributeName: "", + }, + }, + { + name: "record attributes", + expectedDocID: exampleDocID, + recordAttrs: map[string]any{ + documentIDAttributeName: exampleDocID, + }, + }, + } + + cfgs := map[string]func(*Config){ + "async": func(cfg *Config) { + batcherEnabled := false + cfg.Batcher.Enabled = &batcherEnabled + }, + "sync": func(cfg *Config) { + batcherEnabled := true + cfg.Batcher.Enabled = &batcherEnabled + cfg.Batcher.FlushTimeout = 10 * time.Millisecond + }, + } + for _, tt := range tableTests { + for cfgName, cfgFn := range cfgs { + t.Run(tt.name+"/"+cfgName, func(t *testing.T) { + t.Parallel() + rec := newBulkRecorder() + server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { + rec.Record(docs) + + if tt.expectedDocID == "" { + assert.NotContains(t, string(docs[0].Action), "_id", "expected _id to not be set") + } else { + assert.Equal(t, tt.expectedDocID, actionJSONToID(t, docs[0].Action), "expected _id to be set") + } + + // Ensure the document id attribute is removed from the final document. + assert.NotContains(t, docs[0].Document, documentIDAttributeName, "expected document id attribute to be removed") + return itemsAllOK(docs) + }) + + exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) { + cfg.Mapping.Mode = "otel" + cfg.LogsDynamicID.Enabled = true + cfgFn(cfg) + }) + logs := newLogsWithAttributes( + tt.recordAttrs, + map[string]any{}, + map[string]any{}, + ) + logs.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr("hello world") + mustSendLogs(t, exporter, logs) + + rec.WaitItems(1) + }) + } + } + }) } func TestExporterMetrics(t *testing.T) { @@ -1909,3 +1986,14 @@ func actionJSONToIndex(t *testing.T, actionJSON json.RawMessage) string { require.NoError(t, err) return action.Create.Index } + +func actionJSONToID(t *testing.T, actionJSON json.RawMessage) string { + action := struct { + Create struct { + ID string `json:"_id"` + } `json:"create"` + }{} + err := json.Unmarshal(actionJSON, &action) + require.NoError(t, err) + return action.Create.ID +} diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 4783d430196a..3a73f19db86c 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -63,6 +63,9 @@ func createDefaultConfig() component.Config { TracesDynamicIndex: DynamicIndexSetting{ Enabled: false, }, + LogsDynamicID: DynamicIDSettings{ + Enabled: false, + }, Retry: RetrySettings{ Enabled: true, MaxRetries: 0, // default is set in exporter code