Skip to content

Commit

Permalink
perf: replace goccy with gjson and store response items in a slice (#43)
Browse files Browse the repository at this point in the history
* perf: replace goccy with gjson and store response items in a slice

We're not using the map keys and just iterating on the values so
there is no need for a nested struct.
We can just flatten the maps and use a slice.

Replace go-json with gjson and store the item responses in a slice.

The slice was already being reused so there is no additional
allocations.

This significantly reduce memory usage and allocations.

                                      │  before.txt  │     after-nojson-perf-final.txt     │
                                      │    sec/op    │    sec/op     vs base               │
Appender/NoCompression-20               503.8n ±  6%   476.1n ±  9%  -5.49% (p=0.009 n=10)
Appender/NoCompressionScaling-20        874.7n ± 15%   848.9n ± 42%       ~ (p=0.305 n=10)
Appender/BestSpeed-20                   793.3n ±  4%   844.8n ±  5%  +6.49% (p=0.003 n=10)
Appender/BestSpeedScaling-20            789.6n ±  4%   775.7n ±  7%       ~ (p=0.075 n=10)
Appender/DefaultCompression-20          1.489µ ±  3%   1.450µ ±  4%  -2.62% (p=0.003 n=10)
Appender/DefaultCompressionScaling-20   1.469µ ±  4%   1.487µ ±  6%       ~ (p=0.739 n=10)
Appender/BestCompression-20             1.493µ ±  4%   1.473µ ±  7%       ~ (p=0.066 n=10)
Appender/BestCompressionScaling-20      1.496µ ±  3%   1.460µ ±  4%       ~ (p=0.065 n=10)
geomean                                 1.038µ         1.026µ        -1.16%

                                      │  before.txt   │     after-nojson-perf-final.txt      │
                                      │      B/s      │      B/s       vs base               │
Appender/NoCompression-20               253.6Mi ±  6%   268.4Mi ±  8%  +5.82% (p=0.009 n=10)
Appender/NoCompressionScaling-20        146.1Mi ± 13%   150.6Mi ± 74%       ~ (p=0.315 n=10)
Appender/BestSpeed-20                   161.1Mi ±  3%   151.3Mi ±  5%  -6.09% (p=0.003 n=10)
Appender/BestSpeedScaling-20            161.8Mi ±  4%   164.8Mi ±  8%       ~ (p=0.075 n=10)
Appender/DefaultCompression-20          85.83Mi ±  3%   88.14Mi ±  4%  +2.69% (p=0.004 n=10)
Appender/DefaultCompressionScaling-20   87.01Mi ±  3%   85.94Mi ±  7%       ~ (p=0.739 n=10)
Appender/BestCompression-20             85.61Mi ±  4%   86.81Mi ±  7%       ~ (p=0.063 n=10)
Appender/BestCompressionScaling-20      85.44Mi ±  3%   87.51Mi ±  4%       ~ (p=0.065 n=10)
geomean                                 123.1Mi         124.6Mi        +1.18%

                                      │  before.txt  │    after-nojson-perf-final.txt     │
                                      │     B/op     │    B/op     vs base                │
Appender/NoCompression-20                1016.5 ± 0%   333.0 ± 0%  -67.24% (p=0.000 n=10)
Appender/NoCompressionScaling-20         1026.0 ± 1%   333.0 ± 4%  -67.54% (p=0.000 n=10)
Appender/BestSpeed-20                    1044.0 ± 1%   708.5 ± 3%  -32.14% (p=0.000 n=10)
Appender/BestSpeedScaling-20             1037.0 ± 1%   695.5 ± 2%  -32.93% (p=0.000 n=10)
Appender/DefaultCompression-20           1022.5 ± 1%   666.5 ± 6%  -34.82% (p=0.000 n=10)
Appender/DefaultCompressionScaling-20    1030.0 ± 1%   694.5 ± 5%  -32.57% (p=0.000 n=10)
Appender/BestCompression-20              1036.5 ± 2%   696.0 ± 2%  -32.85% (p=0.000 n=10)
Appender/BestCompressionScaling-20       1021.5 ± 2%   699.5 ± 4%  -31.52% (p=0.000 n=10)
geomean                                 1.005Ki        577.2       -43.92%

                                      │ before.txt │    after-nojson-perf-final.txt     │
                                      │ allocs/op  │ allocs/op   vs base                │
Appender/NoCompression-20               6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/NoCompressionScaling-20        6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/BestSpeed-20                   6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/BestSpeedScaling-20            6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/DefaultCompression-20          6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/DefaultCompressionScaling-20   6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/BestCompression-20             6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
Appender/BestCompressionScaling-20      6.000 ± 0%   2.000 ± 0%  -66.67% (p=0.000 n=10)
geomean                                 6.000        2.000       -66.67%

* refactor: wrap read error and remove comments

* refactor: remove minimal bulk response and use slice everywhere
  • Loading branch information
kruskall authored Jul 10, 2023
1 parent 1a0e911 commit 31c7e23
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 47 deletions.
50 changes: 24 additions & 26 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,32 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
return err
}
var docsFailed, docsIndexed, tooManyRequests, clientFailed, serverFailed int64
for _, item := range resp.Items {
for _, info := range item {
if info.Error.Type != "" || info.Status > 201 {
docsFailed++
if info.Status >= 400 && info.Status < 500 {
if info.Status == http.StatusTooManyRequests {
tooManyRequests++
} else {
clientFailed++
}
}
if info.Status >= 500 {
serverFailed++
}
// NOTE(axw) error type and reason are included
// in the error message so we can observe different
// error types/reasons when logging is rate limited.
logger.Error(fmt.Sprintf(
"failed to index document in '%s' (%s): %s",
info.Index, info.Error.Type, info.Error.Reason,
))

if a.tracingEnabled() {
apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send()
for _, info := range resp {
if info.Error.Type != "" || info.Status > 201 {
docsFailed++
if info.Status >= 400 && info.Status < 500 {
if info.Status == http.StatusTooManyRequests {
tooManyRequests++
} else {
clientFailed++
}
} else {
docsIndexed++
}
if info.Status >= 500 {
serverFailed++
}
// NOTE(axw) error type and reason are included
// in the error message so we can observe different
// error types/reasons when logging is rate limited.
logger.Error(fmt.Sprintf(
"failed to index document in '%s' (%s): %s",
info.Index, info.Error.Type, info.Error.Reason,
))

if a.tracingEnabled() {
apm.CaptureError(ctx, errors.New(info.Error.Reason)).Send()
}
} else {
docsIndexed++
}
}
if docsFailed > 0 {
Expand Down
55 changes: 37 additions & 18 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/goccy/go-json"
"github.com/tidwall/gjson"
)

// At the time of writing, the go-elasticsearch BulkIndexer implementation
Expand All @@ -54,12 +54,7 @@ type bulkIndexer struct {
copybuf [32 * 1024]byte
writer io.Writer
buf bytes.Buffer
resp MinimalBulkIndexerResponse
}

// MinimalBulkIndexerResponse represents a minimal subset of the Elasticsearch _bulk API response.
type MinimalBulkIndexerResponse struct {
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
resp []BulkIndexerResponseItem
}

// BulkIndexerResponseItem represents the Elasticsearch response item.
Expand Down Expand Up @@ -92,7 +87,7 @@ func (b *bulkIndexer) Reset() {
if b.gzipw != nil {
b.gzipw.Reset(&b.buf)
}
b.resp = MinimalBulkIndexerResponse{Items: b.resp.Items[:0]}
b.resp = b.resp[:0]
}

// Added returns the number of buffered items.
Expand Down Expand Up @@ -151,15 +146,13 @@ func (b *bulkIndexer) writeMeta(index, action, documentID string) {
}

// Flush executes a bulk request if there are any items buffered, and clears out the buffer.
func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, error) {
func (b *bulkIndexer) Flush(ctx context.Context) ([]BulkIndexerResponseItem, error) {
if b.itemsAdded == 0 {
return MinimalBulkIndexerResponse{}, nil
return nil, nil
}
if b.gzipw != nil {
if err := b.gzipw.Close(); err != nil {
return MinimalBulkIndexerResponse{}, fmt.Errorf(
"failed closing the gzip writer: %w", err,
)
return nil, fmt.Errorf("failed closing the gzip writer: %w", err)
}
}

Expand All @@ -171,21 +164,47 @@ func (b *bulkIndexer) Flush(ctx context.Context) (MinimalBulkIndexerResponse, er
bytesFlushed := b.buf.Len()
res, err := req.Do(ctx, b.client)
if err != nil {
return MinimalBulkIndexerResponse{}, err
return nil, err
}
defer res.Body.Close()
// Record the number of flushed bytes only when err == nil. The body may
// not have been sent otherwise.
b.bytesFlushed = bytesFlushed
if res.IsError() {
if res.StatusCode == http.StatusTooManyRequests {
return MinimalBulkIndexerResponse{}, errorTooManyRequests{res: res}
return nil, errorTooManyRequests{res: res}
}
return MinimalBulkIndexerResponse{}, fmt.Errorf("flush failed: %s", res.String())
return nil, fmt.Errorf("flush failed: %s", res.String())
}
if err := json.NewDecoder(res.Body).Decode(&b.resp); err != nil {
return MinimalBulkIndexerResponse{}, fmt.Errorf("error decoding bulk response: %w", err)

rspBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
}

gjson.GetBytes(rspBody, "items").ForEach(func(key, value gjson.Result) bool {
value.ForEach(func(key, value gjson.Result) bool {
item := BulkIndexerResponseItem{
Index: value.Get("_index").String(),
Status: int(value.Get("status").Int()),
}

if e := value.Get("error"); e.Exists() {
item.Error = struct {
Type string `json:"type"`
Reason string `json:"reason"`
}{
Type: e.Get("type").String(),
Reason: e.Get("reason").String(),
}
}

b.resp = append(b.resp, item)
return true
})
return true
})

return b.resp, nil
}

Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.19

require (
github.com/elastic/go-elasticsearch/v8 v8.8.2
github.com/goccy/go-json v0.10.2
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.4
go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3
go.elastic.co/apm/module/apmzap/v2 v2.4.3
go.elastic.co/apm/v2 v2.4.3
Expand All @@ -25,6 +25,8 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
go.elastic.co/apm/module/apmhttp/v2 v2.4.3 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
Expand Down
8 changes: 6 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ github.com/elastic/go-sysinfo v1.7.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6
github.com/elastic/go-windows v1.0.0/go.mod h1:TsU0Nrp7/y3+VwE82FoZF8gC/XFg/Elz6CcloAxnPgU=
github.com/elastic/go-windows v1.0.1 h1:AlYZOldA+UJ0/2nBuqWdo90GFCgG9xuyw9SYzGUtJm0=
github.com/elastic/go-windows v1.0.1/go.mod h1:FoVvqWSun28vaDQPbj2Elfc0JahhPB7WQEGa3c814Ss=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
Expand All @@ -37,6 +35,12 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM=
github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3 h1:ahFfYDBPNq6G0Vsqncg3cs0HwcEQ76yKdhQeKKvGPck=
go.elastic.co/apm/module/apmelasticsearch/v2 v2.4.3/go.mod h1:2kByxP0WjwX/9vRTKGAlqctTr7tQLN7hSfWPNLQJ+eM=
go.elastic.co/apm/module/apmhttp/v2 v2.4.3 h1:bBqbbtQSEL+uVpH5CS656E9x6pXha8kkZ468/G0T5Eo=
Expand Down

0 comments on commit 31c7e23

Please sign in to comment.