Skip to content

Commit

Permalink
otelconsumer: remove temporary handling of entity too large (elastic#…
Browse files Browse the repository at this point in the history
…41911)

This code was initially added in elastic#41523 because of a limitation from the
elasticsearch exporter.

The elasticsearch exporter has been updated to enforce flush::max_bytes
for the batcher extension and will automatically split the batch if it
exceeds the limit.

This error is now fixed in the collector v0.115.0.

See open-telemetry/opentelemetry-collector-contrib#36396.
  • Loading branch information
mauri870 authored and michalpristas committed Dec 13, 2024
1 parent ac2c179 commit 907a9f3
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 52 deletions.
25 changes: 0 additions & 25 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package otelconsumer
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -103,30 +102,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
if err != nil {
// If the batch is too large, the elasticsearchexporter will
// return a 413 error.
//
// At the moment, the exporter does not support batch splitting
// on error so we do it here.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
if strings.Contains(err.Error(), "Request Entity Too Large") {
// Try and split the batch into smaller batches and retry
if batch.SplitRetry() {
st.BatchSplit()
st.RetryableErrors(len(events))
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
st.PermanentErrors(len(events))
out.log.Errorf("the batch is too large to be sent: %v", err)
}

// Don't propagate the error, the batch was split and retried.
return nil
}

// Permanent errors shouldn't be retried. This tipically means
// the data cannot be serialized by the exporter that is attached
// to the pipeline or when the destination refuses the data because
Expand Down
27 changes: 0 additions & 27 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,6 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("split batch on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
})

t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 2)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
})

t.Run("drop batch on permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down

0 comments on commit 907a9f3

Please sign in to comment.