From 31c7e23227e7428daacd73f9d6fa81be92380bdf Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 10 Jul 2023 03:29:24 +0200 Subject: [PATCH] perf: replace goccy with gjson and store response items in a slice (#43) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: replace goccy with gjson and store response items in a slice We're not using the map keys and just iterating on the values so there is no need for a nested struct. We can just flatten the maps and use a slice. Replace go-json with gjson and store the item responses in a slice. The slice was already being reused so there is no additional allocations. This significantly reduce memory usage and allocations. │ before.txt │ after-nojson-perf-final.txt │ │ sec/op │ sec/op vs base │ Appender/NoCompression-20 503.8n ± 6% 476.1n ± 9% -5.49% (p=0.009 n=10) Appender/NoCompressionScaling-20 874.7n ± 15% 848.9n ± 42% ~ (p=0.305 n=10) Appender/BestSpeed-20 793.3n ± 4% 844.8n ± 5% +6.49% (p=0.003 n=10) Appender/BestSpeedScaling-20 789.6n ± 4% 775.7n ± 7% ~ (p=0.075 n=10) Appender/DefaultCompression-20 1.489µ ± 3% 1.450µ ± 4% -2.62% (p=0.003 n=10) Appender/DefaultCompressionScaling-20 1.469µ ± 4% 1.487µ ± 6% ~ (p=0.739 n=10) Appender/BestCompression-20 1.493µ ± 4% 1.473µ ± 7% ~ (p=0.066 n=10) Appender/BestCompressionScaling-20 1.496µ ± 3% 1.460µ ± 4% ~ (p=0.065 n=10) geomean 1.038µ 1.026µ -1.16% │ before.txt │ after-nojson-perf-final.txt │ │ B/s │ B/s vs base │ Appender/NoCompression-20 253.6Mi ± 6% 268.4Mi ± 8% +5.82% (p=0.009 n=10) Appender/NoCompressionScaling-20 146.1Mi ± 13% 150.6Mi ± 74% ~ (p=0.315 n=10) Appender/BestSpeed-20 161.1Mi ± 3% 151.3Mi ± 5% -6.09% (p=0.003 n=10) Appender/BestSpeedScaling-20 161.8Mi ± 4% 164.8Mi ± 8% ~ (p=0.075 n=10) Appender/DefaultCompression-20 85.83Mi ± 3% 88.14Mi ± 4% +2.69% (p=0.004 n=10) Appender/DefaultCompressionScaling-20 87.01Mi ± 3% 85.94Mi ± 7% ~ (p=0.739 n=10) Appender/BestCompression-20 85.61Mi ± 4% 86.81Mi ± 7% ~ (p=0.063 n=10) Appender/BestCompressionScaling-20 85.44Mi ± 3% 87.51Mi ± 4% ~ (p=0.065 n=10) geomean 123.1Mi 124.6Mi +1.18% │ before.txt │ after-nojson-perf-final.txt │ │ B/op │ B/op vs base │ Appender/NoCompression-20 1016.5 ± 0% 333.0 ± 0% -67.24% (p=0.000 n=10) Appender/NoCompressionScaling-20 1026.0 ± 1% 333.0 ± 4% -67.54% (p=0.000 n=10) Appender/BestSpeed-20 1044.0 ± 1% 708.5 ± 3% -32.14% (p=0.000 n=10) Appender/BestSpeedScaling-20 1037.0 ± 1% 695.5 ± 2% -32.93% (p=0.000 n=10) Appender/DefaultCompression-20 1022.5 ± 1% 666.5 ± 6% -34.82% (p=0.000 n=10) Appender/DefaultCompressionScaling-20 1030.0 ± 1% 694.5 ± 5% -32.57% (p=0.000 n=10) Appender/BestCompression-20 1036.5 ± 2% 696.0 ± 2% -32.85% (p=0.000 n=10) Appender/BestCompressionScaling-20 1021.5 ± 2% 699.5 ± 4% -31.52% (p=0.000 n=10) geomean 1.005Ki 577.2 -43.92% │ before.txt │ after-nojson-perf-final.txt │ │ allocs/op │ allocs/op vs base │ Appender/NoCompression-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/NoCompressionScaling-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/BestSpeed-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/BestSpeedScaling-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/DefaultCompression-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/DefaultCompressionScaling-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/BestCompression-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) Appender/BestCompressionScaling-20 6.000 ± 0% 2.000 ± 0% -66.67% (p=0.000 n=10) geomean 6.000 2.000 -66.67% * refactor: wrap read error and remove comments * refactor: remove minimal bulk response and use slice everywhere --- appender.go | 50 +++++++++++++++++++++----------------------- bulk_indexer.go | 55 +++++++++++++++++++++++++++++++++---------------- go.mod | 4 +++- go.sum | 8 +++++-- 4 files changed, 70 insertions(+), 47 deletions(-) diff --git a/appender.go b/appender.go index f89a8ed..be8007d 100644 --- a/appender.go +++ b/appender.go @@ -269,34 +269,32 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { return err } var docsFailed, docsIndexed, tooManyRequests, clientFailed, serverFailed int64 - for _, item := range resp.Items { - for _, info := range item { - if info.Error.Type != "" || info.Status > 201 { - docsFailed++ - if info.Status >= 400 && info.Status < 500 { - if info.Status == http.StatusTooManyRequests { - tooManyRequests++ - } else { - clientFailed++ - } - } - if info.Status >= 500 { - serverFailed++ - } - // NOTE(axw) error type and reason are included - // in the error message so we can observe different - // error types/reasons when logging is rate limited. - logger.Error(fmt.Sprintf( - "failed to index document in '%s' (%s): %s", - info.Index, info.Error.Type, info.Error.Reason, - )) - - if a.tracingEnabled() { - apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send() + for _, info := range resp { + if info.Error.Type != "" || info.Status > 201 { + docsFailed++ + if info.Status >= 400 && info.Status < 500 { + if info.Status == http.StatusTooManyRequests { + tooManyRequests++ + } else { + clientFailed++ } - } else { - docsIndexed++ } + if info.Status >= 500 { + serverFailed++ + } + // NOTE(axw) error type and reason are included + // in the error message so we can observe different + // error types/reasons when logging is rate limited. + logger.Error(fmt.Sprintf( + "failed to index document in '%s' (%s): %s", + info.Index, info.Error.Type, info.Error.Reason, + )) + + if a.tracingEnabled() { + apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send() + } + } else { + docsIndexed++ } } if docsFailed > 0 { diff --git a/bulk_indexer.go b/bulk_indexer.go index 3949e7e..67da889 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -29,7 +29,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/goccy/go-json" + "github.com/tidwall/gjson" ) // At the time of writing, the go-elasticsearch BulkIndexer implementation @@ -54,12 +54,7 @@ type bulkIndexer struct { copybuf [32 * 1024]byte writer io.Writer buf bytes.Buffer - resp MinimalBulkIndexerResponse -} - -// MinimalBulkIndexerResponse represents a minimal subset of the Elasticsearch _bulk API response. -type MinimalBulkIndexerResponse struct { - Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"` + resp []BulkIndexerResponseItem } // BulkIndexerResponseItem represents the Elasticsearch response item. @@ -92,7 +87,7 @@ func (b *bulkIndexer) Reset() { if b.gzipw != nil { b.gzipw.Reset(&b.buf) } - b.resp = MinimalBulkIndexerResponse{Items: b.resp.Items[:0]} + b.resp = b.resp[:0] } // Added returns the number of buffered items. @@ -151,15 +146,13 @@ func (b *bulkIndexer) writeMeta(index, action, documentID string) { } // Flush executes a bulk request if there are any items buffered, and clears out the buffer. -func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, error) { +func (b *bulkIndexer) Flush(ctx context.Context) ([]BulkIndexerResponseItem, error) { if b.itemsAdded == 0 { - return MinimalBulkIndexerResponse{}, nil + return nil, nil } if b.gzipw != nil { if err := b.gzipw.Close(); err != nil { - return MinimalBulkIndexerResponse{}, fmt.Errorf( - "failed closing the gzip writer: %w", err, - ) + return nil, fmt.Errorf("failed closing the gzip writer: %w", err) } } @@ -171,7 +164,7 @@ func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, er bytesFlushed := b.buf.Len() res, err := req.Do(ctx, b.client) if err != nil { - return MinimalBulkIndexerResponse{}, err + return nil, err } defer res.Body.Close() // Record the number of flushed bytes only when err == nil. The body may @@ -179,13 +172,39 @@ func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, er b.bytesFlushed = bytesFlushed if res.IsError() { if res.StatusCode == http.StatusTooManyRequests { - return MinimalBulkIndexerResponse{}, errorTooManyRequests{res: res} + return nil, errorTooManyRequests{res: res} } - return MinimalBulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String()) + return nil, fmt.Errorf("flush failed: %s", res.String()) } - if err := json.NewDecoder(res.Body).Decode(&b.resp); err != nil { - return MinimalBulkIndexerResponse{}, fmt.Errorf("error decoding bulk response: %w", err) + + rspBody, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) } + + gjson.GetBytes(rspBody, "items").ForEach(func(key, value gjson.Result) bool { + value.ForEach(func(key, value gjson.Result) bool { + item := BulkIndexerResponseItem{ + Index: value.Get("_index").String(), + Status: int(value.Get("status").Int()), + } + + if e := value.Get("error"); e.Exists() { + item.Error = struct { + Type string `json:"type"` + Reason string `json:"reason"` + }{ + Type: e.Get("type").String(), + Reason: e.Get("reason").String(), + } + } + + b.resp = append(b.resp, item) + return true + }) + return true + }) + return b.resp, nil } diff --git a/go.mod b/go.mod index eb2b9b3..87de49a 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.19 require ( github.com/elastic/go-elasticsearch/v8 v8.8.2 - github.com/goccy/go-json v0.10.2 github.com/stretchr/testify v1.8.4 + github.com/tidwall/gjson v1.14.4 go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3 go.elastic.co/apm/module/apmzap/v2 v2.4.3 go.elastic.co/apm/v2 v2.4.3 @@ -25,6 +25,8 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect go.elastic.co/apm/module/apmhttp/v2 v2.4.3 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect diff --git a/go.sum b/go.sum index 966ae79..5098639 100644 --- a/go.sum +++ b/go.sum @@ -13,8 +13,6 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6 github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU= github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0= github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss= -github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= -github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -37,6 +35,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM= +github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3 h1:ahFfYDBPNq6G0Vsqncg3cs0HwcEQ76yKdhQeKKvGPck= go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3/go.mod h1:2kByxP0WjwX/9vRTKGAlqctTr7tQLN7hSfWPNLQJ+eM= go.elastic.co/apm/module/apmhttp/v2 v2.4.3 h1:bBqbbtQSEL+uVpH5CS656E9x6pXha8kkZ468/G0T5Eo=