Skip to content

Commit

Permalink
elasticsearch.events.retried: Add retry dimension (#209)
Browse files Browse the repository at this point in the history
This commit adds a new dimension to the `elasticsearch.events.retried`
metric called `greatest_retry`. It represents the greatest observed
retry count for a bulk request. This is useful to understand how many
times documents in a bulk request have been retried.

Signed-off-by: Marc Lopez Rubio <[email protected]>
  • Loading branch information
marclop authored Oct 23, 2024
1 parent 786add0 commit ac0d424
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 6 deletions.
4 changes: 3 additions & 1 deletion appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ func (a *Appender) flush(ctx context.Context, bulkIndexer *BulkIndexer) error {
}
if resp.RetriedDocs > 0 {
// docs are scheduled to be retried but not yet failed due to retry limit
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried)
a.addCount(resp.RetriedDocs, nil, a.metrics.docsRetried,
metric.WithAttributes(attribute.Int("greatest_retry", resp.GreatestRetry)),
)
}
if docsIndexed > 0 {
a.addCount(docsIndexed, &a.docsIndexed,
Expand Down
14 changes: 11 additions & 3 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,11 @@ loop:
case "elasticsearch.events.processed":
assertProcessedCounter(m, indexerAttrs)
case "elasticsearch.events.retried":
assertCounter(m, 1, indexerAttrs)
assertCounter(m, 1, attribute.NewSet(
attribute.String("a", "b"),
attribute.String("c", "d"),
attribute.Int("greatest_retry", 1),
))
case "elasticsearch.bulk_requests.available":
assertCounter(m, stats.AvailableBulkRequests, indexerAttrs)
case "elasticsearch.flushed.bytes":
Expand Down Expand Up @@ -1046,7 +1050,9 @@ func TestAppenderRetryDocument(t *testing.T) {
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.retried":
assertCounter(m, 5, *attribute.EmptySet())
assertCounter(m, 5, attribute.NewSet(
attribute.Int("greatest_retry", 1),
))
}
})
assert.Equal(t, int64(1), asserted.Load())
Expand All @@ -1064,7 +1070,9 @@ func TestAppenderRetryDocument(t *testing.T) {
docappendertest.AssertOTelMetrics(t, rm.ScopeMetrics[0].Metrics, func(m metricdata.Metrics) {
switch m.Name {
case "elasticsearch.events.retried":
assertCounter(m, 5, *attribute.EmptySet())
assertCounter(m, 5, attribute.NewSet(
attribute.Int("greatest_retry", 2),
))
}
})
assert.Equal(t, int64(2), asserted.Load())
Expand Down
13 changes: 11 additions & 2 deletions bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,15 @@ type BulkIndexer struct {
}

type BulkIndexerResponseStat struct {
Indexed int64
// Indexed contains the total number of successfully indexed documents.
Indexed int64
// RetriedDocs contains the total number of retried documents.
RetriedDocs int64
FailedDocs []BulkIndexerResponseItem
// GreatestRetry contains the greatest observed retry count in the entire
// bulk request.
GreatestRetry int
// FailedDocs contains the failed documents.
FailedDocs []BulkIndexerResponseItem
}

// BulkIndexerResponseItem represents the Elasticsearch response item.
Expand Down Expand Up @@ -483,6 +489,9 @@ func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error
tmp = append(tmp, res)
continue
}
if resp.GreatestRetry < count {
resp.GreatestRetry = count
}

// Since some items may have succeeded, counter positions need
// to be updated to match the next current buffer position.
Expand Down

0 comments on commit ac0d424

Please sign in to comment.