From 5d8f20fdeeb8e890970a2c7fbe422fbb57277443 Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Mon, 7 Aug 2023 18:51:24 +0900 Subject: [PATCH 1/8] feat: use sync counter instead of async --- appender.go | 59 ++++++++++++++++++++++++++++++++++++------------ appender_test.go | 35 ++++++++++++++++++++-------- metric.go | 35 +++++----------------------- 3 files changed, 76 insertions(+), 53 deletions(-) diff --git a/appender.go b/appender.go index b78ef25..ffa72bc 100644 --- a/appender.go +++ b/appender.go @@ -82,11 +82,21 @@ type Appender struct { errgroupContext context.Context cancelErrgroupContext context.CancelFunc telemetryAttrs attribute.Set + processedEventAttrSet map[ProcessedStatus]attribute.Set metrics metrics mu sync.Mutex closed chan struct{} } +type ProcessedStatus string + +const ( + Success ProcessedStatus = "Success" + FailedClient ProcessedStatus = "FailedClient" + FailedServer ProcessedStatus = "FailedServer" + TooMany ProcessedStatus = "TooMany" +) + // New returns a new Appender that indexes documents into Elasticsearch. func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { @@ -125,6 +135,12 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { } } + processedEventAttrSet := map[ProcessedStatus]attribute.Set{ + Success: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "Success"))...), + FailedClient: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "FailedClient"))...), + FailedServer: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "FailedServer"))...), + TooMany: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "TooMany"))...), + } ms, err := newMetrics(cfg) if err != nil { return nil, err @@ -137,12 +153,13 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { cfg.Logger = zap.NewNop() } indexer := &Appender{ - config: cfg, - available: available, - closed: make(chan struct{}), - bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize), - metrics: ms, - telemetryAttrs: cfg.MetricAttributes, + config: cfg, + available: available, + closed: make(chan struct{}), + bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize), + metrics: *ms, + telemetryAttrs: cfg.MetricAttributes, + processedEventAttrSet: processedEventAttrSet, } indexer.addCount(int64(len(available)), &indexer.availableBulkRequests, ms.availableBulkRequests) @@ -235,11 +252,25 @@ func (a *Appender) Add(ctx context.Context, index string, document io.Reader) er return ErrClosed case a.bulkItems <- item: } - a.addCount(1, &a.docsAdded, a.metrics.docsAdded) + a.addProcessedCount(1, &a.docsAdded, a.metrics.docsAdded, Success) a.addCount(1, &a.docsActive, a.metrics.docsActive) return nil } +func (a *Appender) addProcessedCount(delta int64, lm *int64, m metric.Int64Counter, status ProcessedStatus) { + // legacy metric + atomic.AddInt64(lm, delta) + + attr, exist := a.processedEventAttrSet[status] + if !exist { + // warning + fmt.Println("UNKNOwN STATUS", attr) + return + } + + m.Add(context.Background(), delta, metric.WithAttributeSet(attr)) +} + func (a *Appender) addCount(delta int64, lm *int64, m metric.Int64Counter) { // legacy metric atomic.AddInt64(lm, delta) @@ -275,7 +306,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { a.addCount(int64(flushed), &a.bytesTotal, a.metrics.bytesTotal) } if err != nil { - a.addCount(int64(n), &a.docsFailed, a.metrics.docsFailed) + atomic.AddInt64(&a.docsFailed, int64(n)) logger.Error("bulk indexing request failed", zap.Error(err)) if a.tracingEnabled() { apm.CaptureError(ctx, err).Send() @@ -284,7 +315,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { var errTooMany errorTooManyRequests // 429 may be returned as errors from the bulk indexer. if errors.As(err, &errTooMany) { - a.addCount(int64(n), &a.tooManyRequests, a.metrics.tooManyRequests) + a.addProcessedCount(int64(n), &a.tooManyRequests, a.metrics.docsIndexed, TooMany) } return err } @@ -319,19 +350,19 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { } } if docsFailed > 0 { - a.addCount(docsFailed, &a.docsFailed, a.metrics.docsFailed) + atomic.AddInt64(&a.docsFailed, int64(n)) } if docsIndexed > 0 { - a.addCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed) + a.addProcessedCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed, Success) } if tooManyRequests > 0 { - a.addCount(tooManyRequests, &a.tooManyRequests, a.metrics.tooManyRequests) + a.addProcessedCount(tooManyRequests, &a.tooManyRequests, a.metrics.docsIndexed, TooMany) } if clientFailed > 0 { - a.addCount(clientFailed, &a.docsFailedClient, a.metrics.docsFailedClient) + a.addProcessedCount(clientFailed, &a.docsFailedClient, a.metrics.docsIndexed, FailedClient) } if serverFailed > 0 { - a.addCount(serverFailed, &a.docsFailedServer, a.metrics.docsFailedServer) + a.addProcessedCount(serverFailed, &a.docsFailedServer, a.metrics.docsIndexed, FailedServer) } logger.Debug( "bulk request completed", diff --git a/appender_test.go b/appender_test.go index 028f244..9b9d280 100644 --- a/appender_test.go +++ b/appender_test.go @@ -36,6 +36,7 @@ import ( "go.opentelemetry.io/otel/attribute" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -145,6 +146,28 @@ loop: assert.Equal(t, attrs, dp.Attributes) } } + + assertProcessedCounter := func(metric metricdata.Metrics, count int64, attrs attribute.Set) { + asserted++ + counter := metric.Data.(metricdata.Sum[int64]) + for _, dp := range counter.DataPoints { + metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, dp, attrs.ToSlice()...) + status, exist := dp.Attributes.Value(attribute.Key("status")) + assert.True(t, exist) + switch status.AsString() { + case "Success": + assert.Equal(t, stats.Indexed, dp.Value) + case "FailedClient": + assert.Equal(t, stats.FailedClient, dp.Value) + case "FailedServer": + assert.Equal(t, stats.FailedServer, dp.Value) + case "TooMany": + assert.Equal(t, stats.TooManyRequests, dp.Value) + default: + assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) + } + } + } // check the set of names and then check the counter or histogram unexpectedMetrics := []string{} for _, metric := range rm.ScopeMetrics[0].Metrics { @@ -155,16 +178,8 @@ loop: assertCounter(metric, stats.Active, indexerAttrs) case "elasticsearch.bulk_requests.count": assertCounter(metric, stats.BulkRequests, indexerAttrs) - case "elasticsearch.failed.count": - assertCounter(metric, stats.Failed, indexerAttrs) - case "elasticsearch.failed.client.count": - assertCounter(metric, stats.FailedClient, indexerAttrs) - case "elasticsearch.failed.server.count": - assertCounter(metric, stats.FailedServer, indexerAttrs) case "elasticsearch.events.processed": - assertCounter(metric, stats.Indexed, indexerAttrs) - case "elasticsearch.failed.too_many_reqs": - assertCounter(metric, stats.TooManyRequests, indexerAttrs) + assertProcessedCounter(metric, stats.Indexed, indexerAttrs) case "elasticsearch.bulk_requests.available": assertCounter(metric, stats.AvailableBulkRequests, indexerAttrs) case "elasticsearch.flushed.bytes": @@ -180,7 +195,7 @@ loop: } } assert.Empty(t, unexpectedMetrics) - assert.Equal(t, 10, asserted) + assert.Equal(t, 6, asserted) } func TestAppenderAvailableAppenders(t *testing.T) { diff --git a/metric.go b/metric.go index 075c7bb..5f55914 100644 --- a/metric.go +++ b/metric.go @@ -32,10 +32,7 @@ type metrics struct { docsAdded metric.Int64Counter docsActive metric.Int64Counter docsFailed metric.Int64Counter - docsFailedClient metric.Int64Counter - docsFailedServer metric.Int64Counter docsIndexed metric.Int64Counter - tooManyRequests metric.Int64Counter bytesTotal metric.Int64Counter availableBulkRequests metric.Int64Counter activeCreated metric.Int64Counter @@ -56,7 +53,7 @@ type counterMetric struct { p *metric.Int64Counter } -func newMetrics(cfg Config) (metrics, error) { +func newMetrics(cfg Config) (*metrics, error) { if cfg.MeterProvider == nil { cfg.MeterProvider = otel.GetMeterProvider() } @@ -80,7 +77,7 @@ func newMetrics(cfg Config) (metrics, error) { for _, m := range histograms { err := newFloat64Histogram(meter, m) if err != nil { - return ms, err + return &ms, err } } @@ -92,7 +89,7 @@ func newMetrics(cfg Config) (metrics, error) { }, { name: "elasticsearch.events.count", - description: "the total number of items added to the indexer.", + description: "Number of APM Events received for indexing", p: &ms.docsAdded, }, { @@ -100,31 +97,11 @@ func newMetrics(cfg Config) (metrics, error) { description: "the number of active items waiting in the indexer's queue.", p: &ms.docsActive, }, - { - name: "elasticsearch.failed.count", - description: "The amount of time a document was buffered for, in seconds.", - p: &ms.docsFailed, - }, - { - name: "elasticsearch.failed.client.count", - description: "The number of docs failed to get indexed with client error(status_code >= 400 < 500, but not 429).", - p: &ms.docsFailedClient, - }, - { - name: "elasticsearch.failed.server.count", - description: "The number of docs failed to get indexed with server error(status_code >= 500).", - p: &ms.docsFailedServer, - }, { name: "elasticsearch.events.processed", - description: "The number of docs indexed successfully.", + description: "Number of APM Events flushed to Elasticsearch. Dimensions are used to report the project ID, success or failures", p: &ms.docsIndexed, }, - { - name: "elasticsearch.failed.too_many_reqs", - description: "The number of 429 errors returned from the bulk indexer due to too many requests.", - p: &ms.tooManyRequests, - }, { name: "elasticsearch.flushed.bytes", description: "The total number of bytes written to the request body", @@ -150,11 +127,11 @@ func newMetrics(cfg Config) (metrics, error) { for _, m := range counters { err := newInt64Counter(meter, m) if err != nil { - return ms, err + return &ms, err } } - return ms, nil + return &ms, nil } func newInt64Counter(meter metric.Meter, c counterMetric) error { From 8dafa07ae86956b94502ba7e75e4ba4158854d7e Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 11:27:44 +0900 Subject: [PATCH 2/8] fix: add correct number --- appender.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/appender.go b/appender.go index ffa72bc..f707829 100644 --- a/appender.go +++ b/appender.go @@ -252,7 +252,7 @@ func (a *Appender) Add(ctx context.Context, index string, document io.Reader) er return ErrClosed case a.bulkItems <- item: } - a.addProcessedCount(1, &a.docsAdded, a.metrics.docsAdded, Success) + a.addCount(1, &a.docsAdded, a.metrics.docsAdded) a.addCount(1, &a.docsActive, a.metrics.docsActive) return nil } @@ -350,7 +350,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { } } if docsFailed > 0 { - atomic.AddInt64(&a.docsFailed, int64(n)) + atomic.AddInt64(&a.docsFailed, docsFailed) } if docsIndexed > 0 { a.addProcessedCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed, Success) From 9d5b53440080965b96e0a379aa24d86eeb9e1e54 Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 11:54:49 +0900 Subject: [PATCH 3/8] test: assert event.processed status --- appender.go | 43 ++++++++++++++++++------------------------- appender_test.go | 7 ++++++- 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/appender.go b/appender.go index f707829..7397239 100644 --- a/appender.go +++ b/appender.go @@ -38,12 +38,19 @@ import ( "golang.org/x/sync/errgroup" ) +type ProcessedStatus string + var ( // ErrClosed is returned from methods of closed Indexers. ErrClosed = errors.New("model indexer closed") errMissingIndex = errors.New("missing index name") errMissingBody = errors.New("missing document body") + + success ProcessedStatus = "Success" + failedClient ProcessedStatus = "FailedClient" + failedServer ProcessedStatus = "FailedServer" + tooMany ProcessedStatus = "TooMany" ) // Appender provides an append-only API for bulk indexing documents into Elasticsearch. @@ -88,15 +95,6 @@ type Appender struct { closed chan struct{} } -type ProcessedStatus string - -const ( - Success ProcessedStatus = "Success" - FailedClient ProcessedStatus = "FailedClient" - FailedServer ProcessedStatus = "FailedServer" - TooMany ProcessedStatus = "TooMany" -) - // New returns a new Appender that indexes documents into Elasticsearch. func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 { @@ -135,11 +133,12 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { } } + baseAttrs := cfg.MetricAttributes.ToSlice() processedEventAttrSet := map[ProcessedStatus]attribute.Set{ - Success: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "Success"))...), - FailedClient: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "FailedClient"))...), - FailedServer: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "FailedServer"))...), - TooMany: attribute.NewSet(append(cfg.MetricAttributes.ToSlice(), attribute.String("status", "TooMany"))...), + success: attribute.NewSet(append(baseAttrs, attribute.String("status", "Success"))...), + failedClient: attribute.NewSet(append(baseAttrs, attribute.String("status", "FailedClient"))...), + failedServer: attribute.NewSet(append(baseAttrs, attribute.String("status", "FailedServer"))...), + tooMany: attribute.NewSet(append(baseAttrs, attribute.String("status", "TooMany"))...), } ms, err := newMetrics(cfg) if err != nil { @@ -261,13 +260,7 @@ func (a *Appender) addProcessedCount(delta int64, lm *int64, m metric.Int64Count // legacy metric atomic.AddInt64(lm, delta) - attr, exist := a.processedEventAttrSet[status] - if !exist { - // warning - fmt.Println("UNKNOwN STATUS", attr) - return - } - + attr := a.processedEventAttrSet[status] m.Add(context.Background(), delta, metric.WithAttributeSet(attr)) } @@ -315,7 +308,7 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { var errTooMany errorTooManyRequests // 429 may be returned as errors from the bulk indexer. if errors.As(err, &errTooMany) { - a.addProcessedCount(int64(n), &a.tooManyRequests, a.metrics.docsIndexed, TooMany) + a.addProcessedCount(int64(n), &a.tooManyRequests, a.metrics.docsIndexed, tooMany) } return err } @@ -353,16 +346,16 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { atomic.AddInt64(&a.docsFailed, docsFailed) } if docsIndexed > 0 { - a.addProcessedCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed, Success) + a.addProcessedCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed, success) } if tooManyRequests > 0 { - a.addProcessedCount(tooManyRequests, &a.tooManyRequests, a.metrics.docsIndexed, TooMany) + a.addProcessedCount(tooManyRequests, &a.tooManyRequests, a.metrics.docsIndexed, tooMany) } if clientFailed > 0 { - a.addProcessedCount(clientFailed, &a.docsFailedClient, a.metrics.docsIndexed, FailedClient) + a.addProcessedCount(clientFailed, &a.docsFailedClient, a.metrics.docsIndexed, failedClient) } if serverFailed > 0 { - a.addProcessedCount(serverFailed, &a.docsFailedServer, a.metrics.docsIndexed, FailedServer) + a.addProcessedCount(serverFailed, &a.docsFailedServer, a.metrics.docsIndexed, failedServer) } logger.Debug( "bulk request completed", diff --git a/appender_test.go b/appender_test.go index 9b9d280..ef67324 100644 --- a/appender_test.go +++ b/appender_test.go @@ -147,6 +147,7 @@ loop: } } + var processedAsserted int assertProcessedCounter := func(metric metricdata.Metrics, count int64, attrs attribute.Set) { asserted++ counter := metric.Data.(metricdata.Sum[int64]) @@ -156,12 +157,16 @@ loop: assert.True(t, exist) switch status.AsString() { case "Success": + processedAsserted++ assert.Equal(t, stats.Indexed, dp.Value) case "FailedClient": + processedAsserted++ assert.Equal(t, stats.FailedClient, dp.Value) case "FailedServer": + processedAsserted++ assert.Equal(t, stats.FailedServer, dp.Value) case "TooMany": + processedAsserted++ assert.Equal(t, stats.TooManyRequests, dp.Value) default: assert.FailNow(t, "Unexpected metric with status: "+status.AsString()) @@ -196,6 +201,7 @@ loop: } assert.Empty(t, unexpectedMetrics) assert.Equal(t, 6, asserted) + assert.Equal(t, 4, processedAsserted) } func TestAppenderAvailableAppenders(t *testing.T) { @@ -622,7 +628,6 @@ func TestAppenderCloseInterruptAdd(t *testing.T) { defer cancel() go func() { added <- indexer.Add(addContext, "logs-foo-testing", readerFunc(func(p []byte) (int, error) { - fmt.Println("hello?") close(readInvoked) return copy(p, "{}"), nil })) From 828cd007d9cd95a34e329074c458a6fc9209dcef Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 12:07:34 +0900 Subject: [PATCH 4/8] chore: make lint happy --- metric.go | 1 - 1 file changed, 1 deletion(-) diff --git a/metric.go b/metric.go index 5f55914..c9f5c4d 100644 --- a/metric.go +++ b/metric.go @@ -31,7 +31,6 @@ type metrics struct { bulkRequests metric.Int64Counter docsAdded metric.Int64Counter docsActive metric.Int64Counter - docsFailed metric.Int64Counter docsIndexed metric.Int64Counter bytesTotal metric.Int64Counter availableBulkRequests metric.Int64Counter From 2a0837e822b8c3dca96444049d92f8b075d68dee Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 13:06:54 +0900 Subject: [PATCH 5/8] test: expect greater or equal than 2 flush events --- appender_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/appender_test.go b/appender_test.go index ef67324..b4c1211 100644 --- a/appender_test.go +++ b/appender_test.go @@ -411,7 +411,7 @@ func TestAppenderFlushMetric(t *testing.T) { if !ignoreCount { assert.Equal(t, count, int(dp.Count)) } else { - assert.Greater(t, int(dp.Count), count) + assert.GreaterOrEqual(t, int(dp.Count), count) } assert.Positive(t, dp.Sum) assert.Equal(t, attrs, dp.Attributes) From 26f6ec766a3d2dbdf931a9c037e11488cedf748b Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 13:36:58 +0900 Subject: [PATCH 6/8] chore: address review comments --- appender.go | 71 +++++++++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 37 deletions(-) diff --git a/appender.go b/appender.go index 7397239..f0dc4db 100644 --- a/appender.go +++ b/appender.go @@ -38,19 +38,12 @@ import ( "golang.org/x/sync/errgroup" ) -type ProcessedStatus string - var ( // ErrClosed is returned from methods of closed Indexers. ErrClosed = errors.New("model indexer closed") errMissingIndex = errors.New("missing index name") errMissingBody = errors.New("missing document body") - - success ProcessedStatus = "Success" - failedClient ProcessedStatus = "FailedClient" - failedServer ProcessedStatus = "FailedServer" - tooMany ProcessedStatus = "TooMany" ) // Appender provides an append-only API for bulk indexing documents into Elasticsearch. @@ -89,7 +82,6 @@ type Appender struct { errgroupContext context.Context cancelErrgroupContext context.CancelFunc telemetryAttrs attribute.Set - processedEventAttrSet map[ProcessedStatus]attribute.Set metrics metrics mu sync.Mutex closed chan struct{} @@ -133,13 +125,6 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { } } - baseAttrs := cfg.MetricAttributes.ToSlice() - processedEventAttrSet := map[ProcessedStatus]attribute.Set{ - success: attribute.NewSet(append(baseAttrs, attribute.String("status", "Success"))...), - failedClient: attribute.NewSet(append(baseAttrs, attribute.String("status", "FailedClient"))...), - failedServer: attribute.NewSet(append(baseAttrs, attribute.String("status", "FailedServer"))...), - tooMany: attribute.NewSet(append(baseAttrs, attribute.String("status", "TooMany"))...), - } ms, err := newMetrics(cfg) if err != nil { return nil, err @@ -152,13 +137,12 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { cfg.Logger = zap.NewNop() } indexer := &Appender{ - config: cfg, - available: available, - closed: make(chan struct{}), - bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize), - metrics: *ms, - telemetryAttrs: cfg.MetricAttributes, - processedEventAttrSet: processedEventAttrSet, + config: cfg, + available: available, + closed: make(chan struct{}), + bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize), + metrics: *ms, + telemetryAttrs: cfg.MetricAttributes, } indexer.addCount(int64(len(available)), &indexer.availableBulkRequests, ms.availableBulkRequests) @@ -256,20 +240,13 @@ func (a *Appender) Add(ctx context.Context, index string, document io.Reader) er return nil } -func (a *Appender) addProcessedCount(delta int64, lm *int64, m metric.Int64Counter, status ProcessedStatus) { - // legacy metric - atomic.AddInt64(lm, delta) - - attr := a.processedEventAttrSet[status] - m.Add(context.Background(), delta, metric.WithAttributeSet(attr)) -} - -func (a *Appender) addCount(delta int64, lm *int64, m metric.Int64Counter) { +func (a *Appender) addCount(delta int64, lm *int64, m metric.Int64Counter, opts ...metric.AddOption) { // legacy metric atomic.AddInt64(lm, delta) attrs := metric.WithAttributeSet(a.config.MetricAttributes) - m.Add(context.Background(), delta, attrs) + opts = append(opts, attrs) + m.Add(context.Background(), delta, opts...) } func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { @@ -308,7 +285,11 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { var errTooMany errorTooManyRequests // 429 may be returned as errors from the bulk indexer. if errors.As(err, &errTooMany) { - a.addProcessedCount(int64(n), &a.tooManyRequests, a.metrics.docsIndexed, tooMany) + a.addCount(int64(n), + &a.tooManyRequests, + a.metrics.docsIndexed, + metric.WithAttributes(attribute.String("status", "TooMany")), + ) } return err } @@ -346,16 +327,32 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *bulkIndexer) error { atomic.AddInt64(&a.docsFailed, docsFailed) } if docsIndexed > 0 { - a.addProcessedCount(docsIndexed, &a.docsIndexed, a.metrics.docsIndexed, success) + a.addCount(docsIndexed, + &a.docsIndexed, + a.metrics.docsIndexed, + metric.WithAttributes(attribute.String("status", "Success")), + ) } if tooManyRequests > 0 { - a.addProcessedCount(tooManyRequests, &a.tooManyRequests, a.metrics.docsIndexed, tooMany) + a.addCount(tooManyRequests, + &a.tooManyRequests, + a.metrics.docsIndexed, + metric.WithAttributes(attribute.String("status", "TooMany")), + ) } if clientFailed > 0 { - a.addProcessedCount(clientFailed, &a.docsFailedClient, a.metrics.docsIndexed, failedClient) + a.addCount(clientFailed, + &a.docsFailedClient, + a.metrics.docsIndexed, + metric.WithAttributes(attribute.String("status", "FailedClient")), + ) } if serverFailed > 0 { - a.addProcessedCount(serverFailed, &a.docsFailedServer, a.metrics.docsIndexed, failedServer) + a.addCount(serverFailed, + &a.docsFailedServer, + a.metrics.docsIndexed, + metric.WithAttributes(attribute.String("status", "FailedServer")), + ) } logger.Debug( "bulk request completed", From 9230918cab691af52dd15587b487ad68a79d200a Mon Sep 17 00:00:00 2001 From: kyungeunni Date: Tue, 8 Aug 2023 14:07:52 +0900 Subject: [PATCH 7/8] fix: revert returning pointer to a metric --- appender.go | 2 +- metric.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/appender.go b/appender.go index f0dc4db..005a3ec 100644 --- a/appender.go +++ b/appender.go @@ -141,7 +141,7 @@ func New(client *elasticsearch.Client, cfg Config) (*Appender, error) { available: available, closed: make(chan struct{}), bulkItems: make(chan bulkIndexerItem, cfg.DocumentBufferSize), - metrics: *ms, + metrics: ms, telemetryAttrs: cfg.MetricAttributes, } indexer.addCount(int64(len(available)), &indexer.availableBulkRequests, ms.availableBulkRequests) diff --git a/metric.go b/metric.go index c9f5c4d..bf64979 100644 --- a/metric.go +++ b/metric.go @@ -52,7 +52,7 @@ type counterMetric struct { p *metric.Int64Counter } -func newMetrics(cfg Config) (*metrics, error) { +func newMetrics(cfg Config) (metrics, error) { if cfg.MeterProvider == nil { cfg.MeterProvider = otel.GetMeterProvider() } @@ -76,7 +76,7 @@ func newMetrics(cfg Config) (*metrics, error) { for _, m := range histograms { err := newFloat64Histogram(meter, m) if err != nil { - return &ms, err + return ms, err } } @@ -126,11 +126,11 @@ func newMetrics(cfg Config) (*metrics, error) { for _, m := range counters { err := newInt64Counter(meter, m) if err != nil { - return &ms, err + return ms, err } } - return &ms, nil + return ms, nil } func newInt64Counter(meter metric.Meter, c counterMetric) error { From 108f8c2599773812e4ff3feb751dd1fbb429ef6e Mon Sep 17 00:00:00 2001 From: Kyungeun Kim Date: Tue, 8 Aug 2023 14:08:36 +0900 Subject: [PATCH 8/8] chore: update metric description Co-authored-by: Andrew Wilkins --- metric.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metric.go b/metric.go index bf64979..61b2048 100644 --- a/metric.go +++ b/metric.go @@ -98,7 +98,7 @@ func newMetrics(cfg Config) (metrics, error) { }, { name: "elasticsearch.events.processed", - description: "Number of APM Events flushed to Elasticsearch. Dimensions are used to report the project ID, success or failures", + description: "Number of APM Events flushed to Elasticsearch. Attributes are used to report separate counts for different outcomes - success, client failure, etc.", p: &ms.docsIndexed, }, {