Skip to content

Commit

Permalink
Add dynamic templates to bulk indexer item (#189)
Browse files Browse the repository at this point in the history
Make bulk request accept dynamic templates by adding DynamicTemplates field to bulk indexer item. This is only added to bulk indexer, not appender.
  • Loading branch information
carsonip authored Aug 7, 2024
1 parent b180c1b commit 3e17944
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 8 deletions.
34 changes: 28 additions & 6 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,15 @@ func (b *BulkIndexer) BytesUncompressedFlushed() int {
}

type BulkIndexerItem struct {
Index string
DocumentID string
Body io.WriterTo
Index string
DocumentID string
Body io.WriterTo
DynamicTemplates map[string]string
}

// Add encodes an item in the buffer.
func (b *BulkIndexer) Add(item BulkIndexerItem) error {
b.writeMeta(item.Index, item.DocumentID)
b.writeMeta(item.Index, item.DocumentID, item.DynamicTemplates)
if _, err := item.Body.WriteTo(b.writer); err != nil {
return fmt.Errorf("failed to write bulk indexer item: %w", err)
}
Expand All @@ -288,18 +289,39 @@ func (b *BulkIndexer) Add(item BulkIndexerItem) error {
return nil
}

func (b *BulkIndexer) writeMeta(index, documentID string) {
func (b *BulkIndexer) writeMeta(index, documentID string, dynamicTemplates map[string]string) {
b.jsonw.RawString(`{"create":{`)
first := true
if documentID != "" {
b.jsonw.RawString(`"_id":`)
b.jsonw.String(documentID)
first = false
}
if index != "" {
if documentID != "" {
if !first {
b.jsonw.RawByte(',')
}
b.jsonw.RawString(`"_index":`)
b.jsonw.String(index)
first = false
}
if len(dynamicTemplates) > 0 {
if !first {
b.jsonw.RawByte(',')
}
b.jsonw.RawString(`"dynamic_templates":{`)
firstDynamicTemplate := true
for k, v := range dynamicTemplates {
if !firstDynamicTemplate {
b.jsonw.RawByte(',')
}
b.jsonw.String(k)
b.jsonw.RawByte(':')
b.jsonw.String(v)
firstDynamicTemplate = false
}
b.jsonw.RawByte('}')
first = false
}
b.jsonw.RawString("}}\n")
b.writer.Write(b.jsonw.Bytes())
Expand Down
37 changes: 37 additions & 0 deletions bulk_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,40 @@ func TestBulkIndexer(t *testing.T) {
})
}
}

func TestDynamicTemplates(t *testing.T) {
client := docappendertest.NewMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) {
_, result, _, dynamicTemplates := docappendertest.DecodeBulkRequestWithStatsAndDynamicTemplates(r)
require.Equal(t, []map[string]string{
{"one": "two", "three": "four"},
{"five": "six", "seven": "eight"},
}, dynamicTemplates)
json.NewEncoder(w).Encode(result)
})
indexer, err := docappender.NewBulkIndexer(docappender.BulkIndexerConfig{
Client: client,
})
require.NoError(t, err)

err = indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
}),
DynamicTemplates: map[string]string{"one": "two", "three": "four"},
})
require.NoError(t, err)

err = indexer.Add(docappender.BulkIndexerItem{
Index: "testidx",
Body: newJSONReader(map[string]any{
"@timestamp": time.Now().Format(docappendertest.TimestampFormat),
}),
DynamicTemplates: map[string]string{"five": "six", "seven": "eight"},
})
require.NoError(t, err)

stat, err := indexer.Flush(context.Background())
require.NoError(t, err)
require.Equal(t, int64(2), stat.Indexed)
}
17 changes: 15 additions & 2 deletions docappendertest/docappendertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ func DecodeBulkRequestWithStats(r *http.Request) (
docs [][]byte,
res esutil.BulkIndexerResponse,
stats RequestStats) {
indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r)
return indexed, result, stats
}

// DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body,
// returning the decoded documents and a response body and stats about request, and per-request dynamic templates.
func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) (
docs [][]byte,
res esutil.BulkIndexerResponse,
stats RequestStats,
dynamicTemplates []map[string]string) {
body := r.Body
switch r.Header.Get("Content-Encoding") {
case "gzip":
Expand All @@ -76,7 +87,8 @@ func DecodeBulkRequestWithStats(r *http.Request) (
var result esutil.BulkIndexerResponse
for scanner.Scan() {
action := make(map[string]struct {
Index string `json:"_index"`
Index string `json:"_index"`
DynamicTemplates map[string]string `json:"dynamic_templates"`
})
if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil {
panic(err)
Expand All @@ -96,8 +108,9 @@ func DecodeBulkRequestWithStats(r *http.Request) (

item := esutil.BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index}
result.Items = append(result.Items, map[string]esutil.BulkIndexerResponseItem{actionType: item})
dynamicTemplates = append(dynamicTemplates, action[actionType].DynamicTemplates)
}
return indexed, result, RequestStats{int64(cr.bytesRead)}
return indexed, result, RequestStats{int64(cr.bytesRead)}, dynamicTemplates
}

// NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler.
Expand Down

0 comments on commit 3e17944

Please sign in to comment.