diff --git a/appender.go b/appender.go index f89a8ed..be8007d 100644 --- a/appender.go +++ b/appender.go @@ -269,34 +269,32 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { return err } var docsFailed, docsIndexed, tooManyRequests, clientFailed, serverFailed int64 - for _, item := range resp.Items { - for _, info := range item { - if info.Error.Type != "" || info.Status > 201 { - docsFailed++ - if info.Status >= 400 && info.Status < 500 { - if info.Status == http.StatusTooManyRequests { - tooManyRequests++ - } else { - clientFailed++ - } - } - if info.Status >= 500 { - serverFailed++ - } - // NOTE(axw) error type and reason are included - // in the error message so we can observe different - // error types/reasons when logging is rate limited. - logger.Error(fmt.Sprintf( - "failed to index document in '%s' (%s): %s", - info.Index, info.Error.Type, info.Error.Reason, - )) - - if a.tracingEnabled() { - apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send() + for _, info := range resp { + if info.Error.Type != "" || info.Status > 201 { + docsFailed++ + if info.Status >= 400 && info.Status < 500 { + if info.Status == http.StatusTooManyRequests { + tooManyRequests++ + } else { + clientFailed++ } - } else { - docsIndexed++ } + if info.Status >= 500 { + serverFailed++ + } + // NOTE(axw) error type and reason are included + // in the error message so we can observe different + // error types/reasons when logging is rate limited. + logger.Error(fmt.Sprintf( + "failed to index document in '%s' (%s): %s", + info.Index, info.Error.Type, info.Error.Reason, + )) + + if a.tracingEnabled() { + apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send() + } + } else { + docsIndexed++ } } if docsFailed > 0 { diff --git a/bulk_indexer.go b/bulk_indexer.go index 3949e7e..67da889 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/goccy/go-json" + "github.com/tidwall/gjson" ) // At the time of writing, the go-elasticsearch BulkIndexer implementation @@ -54,12 +54,7 @@ type bulkIndexer struct { copybuf [32 * 1024]byte writer io.Writer buf bytes.Buffer - resp MinimalBulkIndexerResponse -} - -// MinimalBulkIndexerResponse represents a minimal subset of the Elasticsearch _bulk API response. -type MinimalBulkIndexerResponse struct { - Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"` + resp []BulkIndexerResponseItem } // BulkIndexerResponseItem represents the Elasticsearch response item. @@ -92,7 +87,7 @@ func (b *bulkIndexer) Reset() { if b.gzipw != nil { b.gzipw.Reset(&b.buf) } - b.resp = MinimalBulkIndexerResponse{Items: b.resp.Items[:0]} + b.resp = b.resp[:0] } // Added returns the number of buffered items. @@ -151,15 +146,13 @@ func (b *bulkIndexer) writeMeta(index, action, documentID string) { } // Flush executes a bulk request if there are any items buffered, and clears out the buffer. -func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, error) { +func (b *bulkIndexer) Flush(ctx context.Context) ([]BulkIndexerResponseItem, error) { if b.itemsAdded == 0 { - return MinimalBulkIndexerResponse{}, nil + return nil, nil } if b.gzipw != nil { if err := b.gzipw.Close(); err != nil { - return MinimalBulkIndexerResponse{}, fmt.Errorf( - "failed closing the gzip writer: %w", err, - ) + return nil, fmt.Errorf("failed closing the gzip writer: %w", err) } } @@ -171,7 +164,7 @@ func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, er bytesFlushed := b.buf.Len() res, err := req.Do(ctx, b.client) if err != nil { - return MinimalBulkIndexerResponse{}, err + return nil, err } defer res.Body.Close() // Record the number of flushed bytes only when err == nil. The body may @@ -179,13 +172,39 @@ func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, er b.bytesFlushed = bytesFlushed if res.IsError() { if res.StatusCode == http.StatusTooManyRequests { - return MinimalBulkIndexerResponse{}, errorTooManyRequests{res: res} + return nil, errorTooManyRequests{res: res} } - return MinimalBulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String()) + return nil, fmt.Errorf("flush failed: %s", res.String()) } - if err := json.NewDecoder(res.Body).Decode(&b.resp); err != nil { - return MinimalBulkIndexerResponse{}, fmt.Errorf("error decoding bulk response: %w", err) + + rspBody, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) } + + gjson.GetBytes(rspBody, "items").ForEach(func(key, value gjson.Result) bool { + value.ForEach(func(key, value gjson.Result) bool { + item := BulkIndexerResponseItem{ + Index: value.Get("_index").String(), + Status: int(value.Get("status").Int()), + } + + if e := value.Get("error"); e.Exists() { + item.Error = struct { + Type string `json:"type"` + Reason string `json:"reason"` + }{ + Type: e.Get("type").String(), + Reason: e.Get("reason").String(), + } + } + + b.resp = append(b.resp, item) + return true + }) + return true + }) + return b.resp, nil } diff --git a/go.mod b/go.mod index eb2b9b3..87de49a 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.19 require ( github.com/elastic/go-elasticsearch/v8 v8.8.2 - github.com/goccy/go-json v0.10.2 github.com/stretchr/testify v1.8.4 + github.com/tidwall/gjson v1.14.4 go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3 go.elastic.co/apm/module/apmzap/v2 v2.4.3 go.elastic.co/apm/v2 v2.4.3 @@ -25,6 +25,8 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.4.3 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect diff --git a/go.sum b/go.sum index 966ae79..5098639 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,6 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6 github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -37,6 +35,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= +github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3 h1:ahFfYDBPNq6G0Vsqncg3cs0HwcEQ76yKdhQeKKvGPck= go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3/go.mod h1:2kByxP0WjwX/9vRTKGAlqctTr7tQLN7hSfWPNLQJ+eM= go.elastic.co/apm/module/apmhttp/v2 v2.4.3 h1:bBqbbtQSEL+uVpH5CS656E9x6pXha8kkZ468/G0T5Eo=