diff --git a/bulk_indexer.go b/bulk_indexer.go index 4b946ad..12645db 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -270,14 +270,15 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int { } type BulkIndexerItem struct { - Index string - DocumentID string - Body io.WriterTo + Index string + DocumentID string + Body io.WriterTo + DynamicTemplates map[string]string } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { - b.writeMeta(item.Index, item.DocumentID) + b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } @@ -288,18 +289,39 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error { return nil } -func (b *BulkIndexer) writeMeta(index, documentID string) { +func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) { b.jsonw.RawString(`{"create":{`) + first := true if documentID != "" { b.jsonw.RawString(`"_id":`) b.jsonw.String(documentID) + first = false } if index != "" { - if documentID != "" { + if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"_index":`) b.jsonw.String(index) + first = false + } + if len(dynamicTemplates) > 0 { + if !first { + b.jsonw.RawByte(',') + } + b.jsonw.RawString(`"dynamic_templates":{`) + firstDynamicTemplate := true + for k, v := range dynamicTemplates { + if !firstDynamicTemplate { + b.jsonw.RawByte(',') + } + b.jsonw.String(k) + b.jsonw.RawByte(':') + b.jsonw.String(v) + firstDynamicTemplate = false + } + b.jsonw.RawByte('}') + first = false } b.jsonw.RawString("}}\n") b.writer.Write(b.jsonw.Bytes()) diff --git a/bulk_indexer_test.go b/bulk_indexer_test.go index 08de4f4..d8497cc 100644 --- a/bulk_indexer_test.go +++ b/bulk_indexer_test.go @@ -128,3 +128,40 @@ func TestBulkIndexer(t *testing.T) { }) } } + +func TestDynamicTemplates(t *testing.T) { + client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + _, result, _, dynamicTemplates := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplates(r) + require.Equal(t, []map[string]string{ + {"one": "two", "three": "four"}, + {"five": "six", "seven": "eight"}, + }, dynamicTemplates) + json.NewEncoder(w).Encode(result) + }) + indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{ + Client: client, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + DynamicTemplates: map[string]string{"one": "two", "three": "four"}, + }) + require.NoError(t, err) + + err = indexer.Add(docappender.BulkIndexerItem{ + Index: "testidx", + Body: newJSONReader(map[string]any{ + "@timestamp": time.Now().Format(docappendertest.TimestampFormat), + }), + DynamicTemplates: map[string]string{"five": "six", "seven": "eight"}, + }) + require.NoError(t, err) + + stat, err := indexer.Flush(context.Background()) + require.NoError(t, err) + require.Equal(t, int64(2), stat.Indexed) +} diff --git a/docappendertest/docappendertest.go b/docappendertest/docappendertest.go index 2f33ab9..ee224cb 100644 --- a/docappendertest/docappendertest.go +++ b/docappendertest/docappendertest.go @@ -56,6 +56,17 @@ func DecodeBulkRequestWithStats(r *http.Request) ( docs [][]byte, res esutil.BulkIndexerResponse, stats RequestStats) { + indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r) + return indexed, result, stats +} + +// DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body, +// returning the decoded documents and a response body and stats about request, and per-request dynamic templates. +func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( + docs [][]byte, + res esutil.BulkIndexerResponse, + stats RequestStats, + dynamicTemplates []map[string]string) { body := r.Body switch r.Header.Get("Content-Encoding") { case "gzip": @@ -76,7 +87,8 @@ func DecodeBulkRequestWithStats(r *http.Request) ( var result esutil.BulkIndexerResponse for scanner.Scan() { action := make(map[string]struct { - Index string `json:"_index"` + Index string `json:"_index"` + DynamicTemplates map[string]string `json:"dynamic_templates"` }) if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil { panic(err) @@ -96,8 +108,9 @@ func DecodeBulkRequestWithStats(r *http.Request) ( item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index} result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item}) + dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates) } - return indexed, result, RequestStats{int64(cr.bytesRead)} + return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates } // NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.