From 2debb33f474c2315ee6699d92b19b65cf793bb16 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 23 Oct 2024 16:48:32 +0800 Subject: [PATCH] `elasticsearch.events.retried`: Add retry dimension This commit adds a new dimension to the `elasticsearch.events.retried` metric called `greatest_retry`. It represents the greatest observed retry count for a bulk request. This is useful to understand how many times documents in a bulk request have been retried. Signed-off-by: Marc Lopez Rubio --- appender.go | 4 +++- appender_test.go | 14 +++++++++++--- bulk_indexer.go | 13 +++++++++++-- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/appender.go b/appender.go index 615127c..a41feea 100644 --- a/appender.go +++ b/appender.go @@ -449,7 +449,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error { } if resp.RetriedDocs > 0 { // docs are scheduled to be retried but not yet failed due to retry limit - a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried) + a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried, + metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)), + ) } if docsIndexed > 0 { a.addCount(docsIndexed, &a.docsIndexed, diff --git a/appender_test.go b/appender_test.go index 4a882b3..646efa9 100644 --- a/appender_test.go +++ b/appender_test.go @@ -329,7 +329,11 @@ loop: case "elasticsearch.events.processed": assertProcessedCounter(m, indexerAttrs) case "elasticsearch.events.retried": - assertCounter(m, 1, indexerAttrs) + assertCounter(m, 1, attribute.NewSet( + attribute.String("a", "b"), + attribute.String("c", "d"), + attribute.Int("greatest_retry", 1), + )) case "elasticsearch.bulk_requests.available": assertCounter(m, stats.AvailableBulkRequests, indexerAttrs) case "elasticsearch.flushed.bytes": @@ -1046,7 +1050,9 @@ func TestAppenderRetryDocument(t *testing.T) { docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { switch m.Name { case "elasticsearch.events.retried": - assertCounter(m, 5, *attribute.EmptySet()) + assertCounter(m, 5, attribute.NewSet( + attribute.Int("greatest_retry", 1), + )) } }) assert.Equal(t, int64(1), asserted.Load()) @@ -1064,7 +1070,9 @@ func TestAppenderRetryDocument(t *testing.T) { docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) { switch m.Name { case "elasticsearch.events.retried": - assertCounter(m, 5, *attribute.EmptySet()) + assertCounter(m, 5, attribute.NewSet( + attribute.Int("greatest_retry", 2), + )) } }) assert.Equal(t, int64(2), asserted.Load()) diff --git a/bulk_indexer.go b/bulk_indexer.go index 6b6a4dd..80044ba 100644 --- a/bulk_indexer.go +++ b/bulk_indexer.go @@ -101,9 +101,15 @@ type BulkIndexer struct { } type BulkIndexerResponseStat struct { - Indexed int64 + // Indexed contains the total number of successfully indexed documents. + Indexed int64 + // RetriedDocs contains the total number of retried documents. RetriedDocs int64 - FailedDocs []BulkIndexerResponseItem + // GreatestRetry contains the greatest observed retry count in the entire + // bulk request. + GreatestRetry int + // FailedDocs contains the failed documents. + FailedDocs []BulkIndexerResponseItem } // BulkIndexerResponseItem represents the Elasticsearch response item. @@ -483,6 +489,9 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error tmp = append(tmp, res) continue } + if resp.GreatestRetry < count { + resp.GreatestRetry = count + } // Since some items may have succeeded, counter positions need // to be updated to match the next current buffer position.