diff --git a/.chloggen/rm-old-push-based-batcher.yaml b/.chloggen/rm-old-push-based-batcher.yaml new file mode 100644 index 000000000000..586636be23ef --- /dev/null +++ b/.chloggen/rm-old-push-based-batcher.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelerp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate alpha featuregate "exporter.UsePullingBasedExporterQueueBatcher". Functionality is enabled by default. + +# One or more tracking issues or pull requests related to the change +issues: [12233] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index dfebb0d1086b..5c24e8f5b5c9 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -25,11 +25,12 @@ import ( "go.opentelemetry.io/collector/pipeline" ) -var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister( +var _ = featuregate.GlobalRegistry().MustRegister( "exporter.UsePullingBasedExporterQueueBatcher", - featuregate.StageBeta, + featuregate.StageDeprecated, featuregate.WithRegisterFromVersion("v0.115.0"), - featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), + featuregate.WithRegisterToVersion("v0.119.0"), + featuregate.WithRegisterDescription("If set to true, turns on the pulling-based exporter queue bathcer"), ) type ObsrepSenderFactory = func(obsrep *ObsReport) Sender[internal.Request] @@ -53,7 +54,6 @@ type BaseExporter struct { // Chain of senders that the exporter helper applies before passing the data to the actual exporter. // The data is handled by each sender in the respective order starting from the queueSender. // Most of the senders are optional, and initialized with a no-op path-through sender. - BatchSender Sender[internal.Request] QueueSender Sender[internal.Request] ObsrepSender Sender[internal.Request] RetrySender Sender[internal.Request] @@ -73,7 +73,6 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } be := &BaseExporter{ - BatchSender: &BaseSender[internal.Request]{}, QueueSender: &BaseSender[internal.Request]{}, ObsrepSender: osf(obsReport), RetrySender: &BaseSender[internal.Request]{}, @@ -101,23 +100,8 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) } - if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || - usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set) - be.BatchSender = bs - } - be.connectSenders() - if bs, ok := be.BatchSender.(*BatchSender); ok { - // If queue sender is enabled assign to the batch sender the same number of workers. - if qs, ok := be.QueueSender.(*QueueSender); ok { - bs.concurrencyLimit = int64(qs.numConsumers) - } - // Batcher sender mutates the data. - be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true})) - } - return be, nil } @@ -133,8 +117,7 @@ func (be *BaseExporter) Send(ctx context.Context, req internal.Request) error { // connectSenders connects the senders in the predefined order. func (be *BaseExporter) connectSenders() { - be.QueueSender.SetNextSender(be.BatchSender) - be.BatchSender.SetNextSender(be.ObsrepSender) + be.QueueSender.SetNextSender(be.ObsrepSender) be.ObsrepSender.SetNextSender(be.RetrySender) be.RetrySender.SetNextSender(be.TimeoutSender) } @@ -145,11 +128,6 @@ func (be *BaseExporter) Start(ctx context.Context, host component.Host) error { return err } - // If no error then start the BatchSender. - if err := be.BatchSender.Start(ctx, host); err != nil { - return err - } - // Last start the queueSender. return be.QueueSender.Start(ctx, host) } @@ -158,8 +136,6 @@ func (be *BaseExporter) Shutdown(ctx context.Context) error { return multierr.Combine( // First shutdown the retry sender, so the queue sender can flush the queue without retries. be.RetrySender.Shutdown(ctx), - // Then shutdown the batch sender - be.BatchSender.Shutdown(ctx), // Then shutdown the queue sender. be.QueueSender.Shutdown(ctx), // Last shutdown the wrapped exporter itself. diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index 12c56bb76a08..9d40284bf531 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -38,80 +38,52 @@ func newNoopObsrepSender(*ObsReport) Sender[internal.Request] { } func TestBaseExporter(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, be.Shutdown(context.Background())) } func TestBaseExporterWithOptions(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - want := errors.New("my error") - be, err := NewBaseExporter( - defaultSettings, defaultSignal, newNoopObsrepSender, - WithStart(func(context.Context, component.Host) error { return want }), - WithShutdown(func(context.Context) error { return want }), - WithTimeout(NewDefaultTimeoutConfig()), - ) - require.NoError(t, err) - require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) - require.Equal(t, want, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + want := errors.New("my error") + be, err := NewBaseExporter( + defaultSettings, defaultSignal, newNoopObsrepSender, + WithStart(func(context.Context, component.Host) error { return want }), + WithShutdown(func(context.Context) error { return want }), + WithTimeout(NewDefaultTimeoutConfig()), + ) + require.NoError(t, err) + require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) + require.Equal(t, want, be.Shutdown(context.Background())) } func TestQueueOptionsWithRequestExporter(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithRetry(configretry.NewDefaultBackOffConfig())) - require.NoError(t, err) - require.Nil(t, bs.Marshaler) - require.Nil(t, bs.Unmarshaler) - _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) - require.Error(t, err) + bs, err := NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithRetry(configretry.NewDefaultBackOffConfig())) + require.NoError(t, err) + require.Nil(t, bs.Marshaler) + require.Nil(t, bs.Unmarshaler) + _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig())) + require.Error(t, err) - _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(configretry.NewDefaultBackOffConfig()), - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.Error(t, err) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + _, err = NewBaseExporter(exportertest.NewNopSettings(), defaultSignal, newNoopObsrepSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(configretry.NewDefaultBackOffConfig()), + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + require.Error(t, err) } func TestBaseExporterLogging(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.DebugLevel) - set.Logger = zap.New(logger) - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg)) - require.NoError(t, err) - sendErr := bs.Send(context.Background(), newErrorRequest()) - require.Error(t, sendErr) + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + bs, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, WithRetry(rCfg)) + require.NoError(t, err) + sendErr := bs.Send(context.Background(), newErrorRequest()) + require.Error(t, sendErr) - require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + require.Len(t, observed.FilterLevelExact(zap.ErrorLevel).All(), 1) } diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go deleted file mode 100644 index 4cb3ace63b02..000000000000 --- a/exporter/exporterhelper/internal/batch_sender.go +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/internal" -) - -// BatchSender is a component that places requests into batches before passing them to the downstream senders. -// Batches are sent out with any of the following conditions: -// - batch size reaches cfg.MinSizeItems -// - cfg.FlushTimeout is elapsed since the timestamp when the previous batch was sent out. -// - concurrencyLimit is reached. -type BatchSender struct { - BaseSender[internal.Request] - cfg exporterbatcher.Config - - // concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher. - // If this number is reached and all the goroutines are busy, the batch will be sent right away. - // Populated from the number of queue consumers if queue is enabled. - concurrencyLimit int64 - activeRequests atomic.Int64 - - mu sync.Mutex - activeBatch *batch - lastFlushed time.Time - - logger *zap.Logger - - shutdownCh chan struct{} - shutdownCompleteCh chan struct{} - stopped *atomic.Bool -} - -// newBatchSender returns a new batch consumer component. -func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender { - bs := &BatchSender{ - activeBatch: newEmptyBatch(), - cfg: cfg, - logger: set.Logger, - shutdownCh: nil, - shutdownCompleteCh: make(chan struct{}), - stopped: &atomic.Bool{}, - } - return bs -} - -func (bs *BatchSender) Start(_ context.Context, _ component.Host) error { - bs.shutdownCh = make(chan struct{}) - timer := time.NewTimer(bs.cfg.FlushTimeout) - go func() { - for { - select { - case <-bs.shutdownCh: - // There is a minimal chance that another request is added after the shutdown signal. - // This loop will handle that case. - for bs.activeRequests.Load() > 0 { - bs.mu.Lock() - if bs.activeBatch.request != nil { - bs.exportActiveBatch() - } - bs.mu.Unlock() - } - if !timer.Stop() { - <-timer.C - } - close(bs.shutdownCompleteCh) - return - case <-timer.C: - bs.mu.Lock() - nextFlush := bs.cfg.FlushTimeout - if bs.activeBatch.request != nil { - sinceLastFlush := time.Since(bs.lastFlushed) - if sinceLastFlush >= bs.cfg.FlushTimeout { - bs.exportActiveBatch() - } else { - nextFlush = bs.cfg.FlushTimeout - sinceLastFlush - } - } - bs.mu.Unlock() - timer.Reset(nextFlush) - } - } - }() - - return nil -} - -type batch struct { - ctx context.Context - request internal.Request - done chan struct{} - err error - - // requestsBlocked is the number of requests blocked in this batch - // that can be immediately released from activeRequests when batch sending completes. - requestsBlocked int64 -} - -func newEmptyBatch() *batch { - return &batch{ - ctx: context.Background(), - done: make(chan struct{}), - } -} - -// exportActiveBatch exports the active batch asynchronously and replaces it with a new one. -// Caller must hold the lock. -func (bs *BatchSender) exportActiveBatch() { - go func(b *batch) { - b.err = bs.NextSender.Send(b.ctx, b.request) - close(b.done) - bs.activeRequests.Add(-b.requestsBlocked) - }(bs.activeBatch) - bs.lastFlushed = time.Now() - bs.activeBatch = newEmptyBatch() -} - -// isActiveBatchReady returns true if the active batch is ready to be exported. -// The batch is ready if it has reached the minimum size or the concurrency limit is reached. -// Caller must hold the lock. -func (bs *BatchSender) isActiveBatchReady() bool { - return bs.activeBatch.request.ItemsCount() >= bs.cfg.MinSizeItems || - (bs.concurrencyLimit > 0 && bs.activeRequests.Load() >= bs.concurrencyLimit) -} - -func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { - // Stopped batch sender should act as pass-through to allow the queue to be drained. - if bs.stopped.Load() { - return bs.NextSender.Send(ctx, req) - } - - if bs.cfg.MaxSizeItems > 0 { - return bs.sendMergeSplitBatch(ctx, req) - } - return bs.sendMergeBatch(ctx, req) -} - -// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. -func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error { - bs.mu.Lock() - - var reqs []internal.Request - var mergeSplitErr error - if bs.activeBatch.request == nil { - reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil) - } else { - reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req) - } - - if mergeSplitErr != nil || len(reqs) == 0 { - bs.mu.Unlock() - return mergeSplitErr - } - - bs.activeRequests.Add(1) - if len(reqs) == 1 { - bs.activeBatch.requestsBlocked++ - } else { - // if there was a split, we want to make sure that bs.activeRequests is released once all of the parts are sent instead of using batch.requestsBlocked - defer bs.activeRequests.Add(-1) - } - if len(reqs) == 1 || bs.activeBatch.request != nil { - bs.updateActiveBatch(ctx, reqs[0]) - batch := bs.activeBatch - if bs.isActiveBatchReady() || len(reqs) > 1 { - bs.exportActiveBatch() - } - bs.mu.Unlock() - <-batch.done - if batch.err != nil { - return batch.err - } - reqs = reqs[1:] - } else { - bs.mu.Unlock() - } - - // Intentionally do not put the last request in the active batch to not block it. - // TODO: Consider including the partial request in the error to avoid double publishing. - for _, r := range reqs { - if err := bs.NextSender.Send(ctx, r); err != nil { - return err - } - } - return nil -} - -// sendMergeBatch sends the request to the batch and waits for the batch to be exported. -func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error { - bs.mu.Lock() - - if bs.activeBatch.request != nil { - res, err := bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req) - if err != nil { - bs.mu.Unlock() - return err - } - req = res[0] - } - - bs.activeRequests.Add(1) - bs.updateActiveBatch(ctx, req) - batch := bs.activeBatch - batch.requestsBlocked++ - if bs.isActiveBatchReady() { - bs.exportActiveBatch() - } - bs.mu.Unlock() - <-batch.done - return batch.err -} - -// updateActiveBatch update the active batch to the new merged request and context. -// The context is only set once and is not updated after the first call. -// Merging the context would be complex and require an additional goroutine to handle the context cancellation. -// We take the approach of using the context from the first request since it's likely to have the shortest timeout. -func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) { - if bs.activeBatch.request == nil { - bs.activeBatch.ctx = ctx - } - bs.activeBatch.request = req -} - -func (bs *BatchSender) Shutdown(context.Context) error { - bs.stopped.Store(true) - if bs.shutdownCh != nil { - close(bs.shutdownCh) - <-bs.shutdownCompleteCh - } - return nil -} diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go deleted file mode 100644 index ede5a7dd84bf..000000000000 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ /dev/null @@ -1,774 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal - -import ( - "context" - "errors" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/exporter/exporterbatcher" - "go.opentelemetry.io/collector/exporter/exporterqueue" - "go.opentelemetry.io/collector/exporter/internal" - "go.opentelemetry.io/collector/exporter/internal/requesttest" -) - -func TestBatchSender_Merge(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10758") - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - - tests := []struct { - name string - batcherOption Option - }{ - { - name: "split_disabled", - batcherOption: WithBatcher(cfg), - }, - { - name: "split_high_limit", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 1000 - return WithBatcher(c) - }(), - }, - } - - runTest := func(testName string, enableQueueBatcher bool, tt struct { - name string - batcherOption Option - }, - ) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - sink := requesttest.NewSink() - - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink})) - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 3, Sink: sink})) - - // the first two requests should be merged into one and sent by reaching the minimum items size - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 1 && sink.ItemsCount() == 11 - }, 50*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 3, Sink: sink})) - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink})) - - // the third and fifth requests should be sent by reaching the timeout - // the fourth request should be ignored because of the merge error. - time.Sleep(50 * time.Millisecond) - - // should be ignored because of the merge error. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{ - Items: 3, Sink: sink, - MergeErr: errors.New("merge error"), - })) - - assert.Equal(t, int64(1), sink.RequestsCount()) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 2 && sink.ItemsCount() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } - for _, tt := range tests { - runTest(tt.name+"_enable_queue_batcher", true, tt) - runTest(tt.name+"_disable_queue_batcher", false, tt) - } -} - -func TestBatchSender_BatchExportError(t *testing.T) { - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 10 - tests := []struct { - name string - batcherOption Option - expectedRequests int64 - expectedItems int64 - }{ - { - name: "merge_only", - batcherOption: WithBatcher(cfg), - }, - { - name: "merge_without_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 200 - return WithBatcher(c) - }(), - }, - { - name: "merge_with_split_triggered", - batcherOption: func() Option { - c := cfg - c.MaxSizeItems = 20 - return WithBatcher(c) - }(), - expectedRequests: 1, - expectedItems: 20, - }, - } - runTest := func(testName string, enableQueueBatcher bool, tt struct { - name string - batcherOption Option - expectedRequests int64 - expectedItems int64 - }, - ) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - be := queueBatchExporter(t, tt.batcherOption) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - sink := requesttest.NewSink() - - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - - // the first two requests should be blocked by the batchSender. - time.Sleep(50 * time.Millisecond) - assert.Equal(t, int64(0), sink.RequestsCount()) - - // the third request should trigger the export and cause an error. - errReq := &requesttest.FakeRequest{Items: 20, ExportErr: errors.New("transient error"), Sink: sink} - require.NoError(t, be.Send(context.Background(), errReq)) - - // the batch should be dropped since the queue doesn't have requeuing enabled. - assert.Eventually(t, func() bool { - return sink.RequestsCount() == tt.expectedRequests && - sink.ItemsCount() == tt.expectedItems && - be.QueueSender.(*QueueSender).queue.Size() == 0 - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } - for _, tt := range tests { - runTest(tt.name+"_enable_queue_batcher", true, tt) - runTest(tt.name+"_disable_queue_batcher", false, tt) - } -} - -func TestBatchSender_MergeOrSplit(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - - cfg := exporterbatcher.NewDefaultConfig() - cfg.MinSizeItems = 5 - cfg.MaxSizeItems = 10 - cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg)) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - sink := requesttest.NewSink() - - // should be sent right away by reaching the minimum items size. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink})) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 1 && sink.ItemsCount() == 8 - }, 50*time.Millisecond, 10*time.Millisecond) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 17, Sink: sink})) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 3 && sink.ItemsCount() == 25 - }, 50*time.Millisecond, 10*time.Millisecond) - - // request that cannot be split should be dropped. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{ - Items: 11, Sink: sink, - MergeErr: errors.New("split error"), - })) - - // big request should be broken down into two requests, both are sent right away. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 13, Sink: sink})) - - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 5 && sink.ItemsCount() == 38 - }, 50*time.Millisecond, 10*time.Millisecond) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestBatchSender_Shutdown(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - batchCfg := exporterbatcher.NewDefaultConfig() - batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg)) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := requesttest.NewSink() - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 3, Sink: sink})) - - // To make the request reached the batchSender before shutdown. - time.Sleep(50 * time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) - - // shutdown should force sending the batch - assert.Equal(t, int64(1), sink.RequestsCount()) - assert.Equal(t, int64(3), sink.ItemsCount()) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestBatchSender_Disabled(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - cfg := exporterbatcher.NewDefaultConfig() - cfg.Enabled = false - cfg.MaxSizeItems = 5 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(cfg)) - require.NotNil(t, be) - require.NoError(t, err) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := requesttest.NewSink() - // should be sent right away without splitting because batching is disabled. - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink})) - assert.Equal(t, int64(1), sink.RequestsCount()) - assert.Equal(t, int64(8), sink.ItemsCount()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { -// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, -// error) { -// // reply with invalid 0 length slice if req2 is more than 20 items -// if req2.(*requesttest.FakeRequest).items > 20 { -// return []internal.Request{}, nil -// } -// // otherwise reply with a single request. -// return []internal.Request{req2}, nil -// } -// cfg := exporterbatcher.NewDefaultConfig() -// cfg.FlushTimeout = 50 * time.Millisecond -// cfg.MaxSizeItems = 20 -// be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) - -// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) -// t.Cleanup(func() { -// require.NoError(t, be.Shutdown(context.Background())) -// }) - -// sink := requesttest.NewSink() -// // first request should be ignored due to invalid merge/split function. -// require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 30, Sink: sink})) -// // second request should be sent after reaching the timeout. -// require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 15, Sink: sink})) -// assert.Eventually(t, func() bool { -// return sink.RequestsCount() == 1 && sink.ItemsCount() == 15 -// }, 100*time.Millisecond, 10*time.Millisecond) -// } - -func TestBatchSender_PostShutdown(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, be.Shutdown(context.Background())) - - // Closed batch sender should act as a pass-through to not block queue draining. - sink := requesttest.NewSink() - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 8, Sink: sink})) - assert.Equal(t, int64(1), sink.RequestsCount()) - assert.Equal(t, int64(8), sink.ItemsCount()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10810") - } - tests := []struct { - name string - batcherCfg exporterbatcher.Config - expectedRequests int64 - expectedItems int64 - }{ - { - name: "merge_only", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_without_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 20 * time.Millisecond - cfg.MaxSizeItems = 200 - return cfg - }(), - expectedRequests: 6, - expectedItems: 51, - }, - { - name: "merge_with_split_triggered", - batcherCfg: func() exporterbatcher.Config { - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 10 - return cfg - }(), - expectedRequests: 8, - expectedItems: 51, - }, - } - - // Why do we not expect the same behavior when usePullingBasedExporterQueueBatcher is true? - // This test checks that when concurrency limit of batch_sender is reached, the batch_sender will flush immediately. - // To avoid blocking, the concurrency limit is set to the number of concurrent goroutines that are in charge of - // reading from the queue and adding to batch. With the new model, we are pulling instead of pushing so we don't - // block the reading goroutine anymore. - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, false)() - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - qCfg := exporterqueue.NewDefaultConfig() - qCfg.NumConsumers = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(tt.batcherCfg), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.NotNil(t, be) - require.NoError(t, err) - assert.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - sink := requesttest.NewSink() - // the 1st and 2nd request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 1 && sink.ItemsCount() == 4 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 3rd request should be flushed by itself due to flush interval - require.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 2 && sink.ItemsCount() == 6 - }, 100*time.Millisecond, 10*time.Millisecond) - - // the 4th and 5th request should be flushed in the same batched request by max concurrency limit. - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 2, Sink: sink})) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 3 && sink.ItemsCount() == 10 - }, 100*time.Millisecond, 10*time.Millisecond) - - // do it a few more times to ensure it produces the correct batch size regardless of goroutine scheduling. - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 5, Sink: sink})) - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink})) - if tt.batcherCfg.MaxSizeItems == 10 { - // in case of MaxSizeItems=10, wait for the leftover request to send - assert.Eventually(t, func() bool { - return sink.RequestsCount() == 5 && sink.ItemsCount() == 21 - }, 50*time.Millisecond, 10*time.Millisecond) - } - - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 6, Sink: sink})) - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 20, Sink: sink})) - assert.Eventually(t, func() bool { - return sink.RequestsCount() == tt.expectedRequests && sink.ItemsCount() == tt.expectedItems - }, 100*time.Millisecond, 10*time.Millisecond) - }) - } -} - -func TestBatchSender_BatchBlocking(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 3 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := requesttest.NewSink() - - // send 6 blocking requests - wg := sync.WaitGroup{} - for i := 0; i < 6; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 10 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - // should be sent in two batches since the batch size is 3 - assert.Equal(t, int64(2), sink.RequestsCount()) - assert.Equal(t, int64(6), sink.ItemsCount()) - - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -// Validate that the batch is cancelled once the first request in the request is cancelled -func TestBatchSender_BatchCancelled(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := requesttest.NewSink() - - // send 2 blocking requests - wg := sync.WaitGroup{} - ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) - go func() { - assert.ErrorIs(t, be.Send(ctx, &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - wg.Add(1) - go func() { - time.Sleep(20 * time.Millisecond) // ensure this call is the second - assert.ErrorIs(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 100 * time.Millisecond}), context.Canceled) - wg.Done() - }() - cancel() // canceling the first request should cancel the whole batch - wg.Wait() - - // nothing should be delivered - assert.Equal(t, int64(0), sink.RequestsCount()) - assert.Equal(t, int64(0), sink.ItemsCount()) - - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestBatchSender_DrainActiveRequests(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 2 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NotNil(t, be) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := requesttest.NewSink() - - // send 3 blocking requests with a timeout - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 40 * time.Millisecond})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 1, Sink: sink, Delay: 40 * time.Millisecond})) - }() - - // give time for the first two requests to be batched - time.Sleep(20 * time.Millisecond) - - // Shutdown should force the active batch to be dispatched and wait for all batches to be delivered. - // It should take 120 milliseconds to complete. - require.NoError(t, be.Shutdown(context.Background())) - - assert.Equal(t, int64(2), sink.RequestsCount()) - assert.Equal(t, int64(3), sink.ItemsCount()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func TestBatchSender_UnstartedShutdown(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(exporterbatcher.NewDefaultConfig())) - require.NoError(t, err) - - err = be.Shutdown(context.Background()) - require.NoError(t, err) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -// TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being -// merged. -// func TestBatchSender_ShutdownDeadlock(t *testing.T) { -// blockMerge := make(chan struct{}) -// waitMerge := make(chan struct{}, 10) - -// // blockedBatchMergeFunc blocks until the blockMerge channel is closed -// blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { -// waitMerge <- struct{}{} -// <-blockMerge -// r1.(*requesttest.FakeRequest).items += r2.(*requesttest.FakeRequest).items -// return r1, nil -// } - -// bCfg := exporterbatcher.NewDefaultConfig() -// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger -// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, -// WithBatcher(bCfg)) -// require.NoError(t, err) -// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - -// sink := requesttest.NewSink() - -// // Send 2 concurrent requests -// go func() { assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) }() -// go func() { assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) }() - -// // Wait for the requests to enter the merge function -// <-waitMerge - -// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, -// // then wait for the exporter to finish. -// startShutdown := make(chan struct{}) -// doneShutdown := make(chan struct{}) -// go func() { -// close(startShutdown) -// assert.NoError(t, be.Shutdown(context.Background())) -// close(doneShutdown) -// }() -// <-startShutdown -// close(blockMerge) -// <-doneShutdown - -// assert.EqualValues(t, 1, sink.RequestsCount()) -// assert.EqualValues(t, 8, sink.ItemsCount()) -// } - -func TestBatchSenderWithTimeout(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 10 - tCfg := NewDefaultTimeoutConfig() - tCfg.Timeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg), - WithTimeout(tCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := requesttest.NewSink() - - // Send 3 concurrent requests that should be merged in one batch - wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - wg.Done() - }() - } - wg.Wait() - assert.EqualValues(t, 1, sink.RequestsCount()) - assert.EqualValues(t, 12, sink.ItemsCount()) - - // 3 requests with a 90ms cumulative delay must be cancelled by the timeout sender - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - assert.Error(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink, Delay: 30 * time.Millisecond})) - wg.Done() - }() - } - wg.Wait() - - require.NoError(t, be.Shutdown(context.Background())) - - // The sink should not change - assert.EqualValues(t, 1, sink.RequestsCount()) - assert.EqualValues(t, 12, sink.ItemsCount()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -// func TestBatchSenderTimerResetNoConflict(t *testing.T) { -// delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { -// time.Sleep(30 * time.Millisecond) -// if r1 == nil { -// return r2, nil -// } -// fr1 := r1.(*requesttest.FakeRequest) -// fr2 := r2.(*requesttest.FakeRequest) -// if fr2.mergeErr != nil { -// return nil, fr2.mergeErr -// } -// return &requesttest.FakeRequest{ -// Items: fr1.items + fr2.items, -// Sink: fr1.sink, -// ExportErr: fr2.exportErr, -// Delay: fr1.delay + fr2.delay, -// }, nil -// } -// bCfg := exporterbatcher.NewDefaultConfig() -// bCfg.MinSizeItems = 8 -// bCfg.FlushTimeout = 50 * time.Millisecond -// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, -// WithBatcher(bCfg)) -// require.NoError(t, err) -// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) -// sink := requesttest.NewSink() - -// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer -// go func() { -// assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) -// }() -// time.Sleep(30 * time.Millisecond) -// go func() { -// assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) -// }() - -// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict -// assert.EventuallyWithT(t, func(c *assert.CollectT) { -// assert.LessOrEqual(c, int64(1), sink.RequestsCount()) -// assert.EqualValues(c, 8, sink.ItemsCount()) -// }, 200*time.Millisecond, 10*time.Millisecond) - -// require.NoError(t, be.Shutdown(context.Background())) -// } - -func TestBatchSenderTimerFlush(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - if runtime.GOOS == "windows" { - t.Skip("skipping flaky test on Windows, see https://github.com/open-telemetry/opentelemetry-collector/issues/10802") - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := requesttest.NewSink() - time.Sleep(50 * time.Millisecond) - - // Send 2 concurrent requests that should be merged in one batch and sent immediately - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - }() - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - }() - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, int64(1), sink.RequestsCount()) - assert.EqualValues(c, 8, sink.ItemsCount()) - }, 30*time.Millisecond, 5*time.Millisecond) - - // Send another request that should be flushed after 100ms instead of 50ms since last flush - go func() { - assert.NoError(t, be.Send(context.Background(), &requesttest.FakeRequest{Items: 4, Sink: sink})) - }() - - // Confirm that it is not flushed in 50ms - time.Sleep(60 * time.Millisecond) - assert.LessOrEqual(t, int64(1), sink.RequestsCount()) - assert.EqualValues(t, 8, sink.ItemsCount()) - - // Confirm that it is flushed after 100ms (using 60+50=110 here to be safe) - time.Sleep(50 * time.Millisecond) - assert.LessOrEqual(t, int64(2), sink.RequestsCount()) - assert.EqualValues(t, 12, sink.ItemsCount()) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) -} - -func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { - opts = append(opts, WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, opts...) - require.NotNil(t, be) - require.NoError(t, err) - return be -} diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 4e20a672e737..ae0e5e820a2a 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -70,10 +70,8 @@ func (qCfg *QueueConfig) Validate() error { type QueueSender struct { BaseSender[internal.Request] - queue exporterqueue.Queue[internal.Request] - numConsumers int - batcher queue.Batcher - consumers *queue.Consumers[internal.Request] + queue exporterqueue.Queue[internal.Request] + batcher queue.Batcher obsrep *ObsReport exporterID component.ID @@ -89,11 +87,10 @@ func NewQueueSender( batcherCfg exporterbatcher.Config, ) *QueueSender { qs := &QueueSender{ - queue: q, - numConsumers: numConsumers, - obsrep: obsrep, - exporterID: set.ID, - logger: set.Logger, + queue: q, + obsrep: obsrep, + exporterID: set.ID, + logger: set.Logger, } exportFunc := func(ctx context.Context, req internal.Request) error { @@ -104,11 +101,7 @@ func NewQueueSender( } return err } - if usePullingBasedExporterQueueBatcher.IsEnabled() { - qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers) - } else { - qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, exportFunc) - } + qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers) return qs } @@ -118,14 +111,8 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { return err } - if usePullingBasedExporterQueueBatcher.IsEnabled() { - if err := qs.batcher.Start(ctx, host); err != nil { - return err - } - } else { - if err := qs.consumers.Start(ctx, host); err != nil { - return err - } + if err := qs.batcher.Start(ctx, host); err != nil { + return err } exporterAttr := attribute.String(ExporterKey, qs.exporterID.String()) @@ -150,17 +137,13 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { // At the end, make sure metrics are un-registered since we want to free this object. defer qs.obsrep.TelemetryBuilder.Shutdown() - if err := qs.queue.Shutdown(ctx); err != nil { - return err - } - if usePullingBasedExporterQueueBatcher.IsEnabled() { - return qs.batcher.Shutdown(ctx) - } - err := qs.consumers.Shutdown(ctx) - return err + return errors.Join( + qs.queue.Shutdown(ctx), + qs.batcher.Shutdown(ctx), + ) } -// send implements the requestSender interface. It puts the request in the queue. +// Send implements the requestSender interface. It puts the request in the queue. func (qs *QueueSender) Send(ctx context.Context, req internal.Request) error { // Prevent cancellation and deadline to propagate to the context stored in the queue. // The grpc/http based receivers will cancel the request context after this function returns. diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 57ba7e37b2c0..cef7d0961fe1 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -31,112 +31,87 @@ import ( ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - firstMockR := newErrorRequest() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), firstMockR)) - }) - - // Enqueue another request to ensure when calling shutdown we drain the queue. - secondMockR := newMockRequest(3, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), secondMockR)) - }) - - require.LessOrEqual(t, int64(1), be.QueueSender.(*QueueSender).queue.Size()) - - require.NoError(t, be.Shutdown(context.Background())) - - secondMockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 3) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + firstMockR := newErrorRequest() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), firstMockR)) + }) + + // Enqueue another request to ensure when calling shutdown we drain the queue. + secondMockR := newMockRequest(3, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), secondMockR)) + }) + + require.LessOrEqual(t, int64(1), be.QueueSender.(*QueueSender).queue.Size()) + + require.NoError(t, be.Shutdown(context.Background())) + + secondMockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 3) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) } func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - ctx, cancelFunc := context.WithCancel(context.Background()) - cancelFunc() - mockR := newMockRequest(2, nil) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(ctx, mockR)) - }) - ocs.awaitAsyncProcessing() - - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + ctx, cancelFunc := context.WithCancel(context.Background()) + cancelFunc() + mockR := newMockRequest(2, nil) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(ctx, mockR)) + }) + ocs.awaitAsyncProcessing() + + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) } func TestQueuedRetry_RejectOnFull(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.QueueSize = 0 - qCfg.NumConsumers = 0 - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) - assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.QueueSize = 0 + qCfg.NumConsumers = 0 + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, defaultSignal, newNoopObsrepSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + require.Error(t, be.Send(context.Background(), newMockRequest(2, nil))) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data.", observed.All()[0].Message) + assert.Equal(t, "sending queue is full", observed.All()[0].ContextMap()["error"]) } func TestQueuedRetryHappyPath(t *testing.T) { @@ -192,13 +167,8 @@ func TestQueuedRetryHappyPath(t *testing.T) { }, } - runTest := func(testName string, enableQueueBatcher bool, tt struct { - name string - queueOptions []Option - }, - ) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { tel, err := componenttest.SetupTelemetry(defaultID) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) @@ -224,7 +194,6 @@ func TestQueuedRetryHappyPath(t *testing.T) { require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() }) // Wait until all batches received @@ -239,113 +208,86 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.checkDroppedItemsCount(t, 0) }) } - for _, tt := range tests { - runTest(tt.name+"_enable_queue_batcher", true, tt) - runTest(tt.name+"_disable_queue_batcher", false, tt) - } } func TestQueuedRetry_QueueMetricsReported(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics} - for _, dataType := range dataTypes { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = -1 // to make QueueMetricsReportedvery request go straight to the queue - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - metadatatest.AssertEqualExporterQueueCapacity(t, tt.Telemetry, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String("exporter", defaultID.String())), - Value: int64(1000), - }, - }, metricdatatest.IgnoreTimestamp()) - - for i := 0; i < 7; i++ { - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - } - metadatatest.AssertEqualExporterQueueSize(t, tt.Telemetry, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String("exporter", defaultID.String()), - attribute.String(DataTypeKey, dataType.String())), - Value: int64(7), - }, - }, metricdatatest.IgnoreTimestamp()) + dataTypes := []pipeline.Signal{pipeline.SignalLogs, pipeline.SignalTraces, pipeline.SignalMetrics} + for _, dataType := range dataTypes { + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = -1 // to make QueueMetricsReportedvery request go straight to the queue + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, dataType, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + + metadatatest.AssertEqualExporterQueueCapacity(t, tt.Telemetry, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("exporter", defaultID.String())), + Value: int64(1000), + }, + }, metricdatatest.IgnoreTimestamp()) + + for i := 0; i < 7; i++ { + require.NoError(t, be.Send(context.Background(), newErrorRequest())) + } + metadatatest.AssertEqualExporterQueueSize(t, tt.Telemetry, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("exporter", defaultID.String()), + attribute.String(DataTypeKey, dataType.String())), + Value: int64(7), + }, + }, metricdatatest.IgnoreTimestamp()) - assert.NoError(t, be.Shutdown(context.Background())) + assert.NoError(t, be.Shutdown(context.Background())) - // metrics should be unregistered at shutdown to prevent memory leak - var md metricdata.ResourceMetrics - require.NoError(t, tt.Reader.Collect(context.Background(), &md)) - assert.Empty(t, md.ScopeMetrics) - } - }) + // metrics should be unregistered at shutdown to prevent memory leak + var md metricdata.ResourceMetrics + require.NoError(t, tt.Reader.Collect(context.Background(), &md)) + assert.Empty(t, md.ScopeMetrics) } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) } func TestNoCancellationContext(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - deadline := time.Now().Add(1 * time.Second) - ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) - cancelFunc() - require.Error(t, ctx.Err()) - d, ok := ctx.Deadline() - require.True(t, ok) - require.Equal(t, deadline, d) - - nctx := context.WithoutCancel(ctx) - require.NoError(t, nctx.Err()) - d, ok = nctx.Deadline() - assert.False(t, ok) - assert.True(t, d.IsZero()) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + deadline := time.Now().Add(1 * time.Second) + ctx, cancelFunc := context.WithDeadline(context.Background(), deadline) + cancelFunc() + require.Error(t, ctx.Err()) + d, ok := ctx.Deadline() + require.True(t, ok) + require.Equal(t, deadline, d) + + nctx := context.WithoutCancel(ctx) + require.NoError(t, nctx.Err()) + d, ok = nctx.Deadline() + assert.False(t, ok) + assert.True(t, d.IsZero()) } func TestQueueConfig_Validate(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - qCfg := NewDefaultQueueConfig() - require.NoError(t, qCfg.Validate()) + qCfg := NewDefaultQueueConfig() + require.NoError(t, qCfg.Validate()) - qCfg.QueueSize = 0 - require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive") + qCfg.QueueSize = 0 + require.EqualError(t, qCfg.Validate(), "`queue_size` must be positive") - qCfg = NewDefaultQueueConfig() - qCfg.NumConsumers = 0 + qCfg = NewDefaultQueueConfig() + qCfg.NumConsumers = 0 - require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive") + require.EqualError(t, qCfg.Validate(), "`num_consumers` must be positive") - // Confirm Validate doesn't return error with invalid config when feature is disabled - qCfg.Enabled = false - assert.NoError(t, qCfg.Validate()) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) } func TestQueueRetryWithDisabledQueue(t *testing.T) { @@ -377,13 +319,8 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { }, } - runTest := func(testName string, enableQueueBatcher bool, tt struct { - name string - queueOptions []Option - }, - ) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { set := exportertest.NewNopSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) @@ -404,178 +341,135 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.NoError(t, be.Shutdown(context.Background())) }) } - for _, tt := range tests { - runTest(tt.name+"_enable_queue_batcher", true, tt) - runTest(tt.name+"_disable_queue_batcher", false, tt) - } } func TestQueueFailedRequestDropped(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, pipeline.SignalLogs, newNoopObsrepSender, - WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - mockR := newMockRequest(2, errors.New("some error")) - require.NoError(t, be.Send(context.Background(), mockR)) - require.NoError(t, be.Shutdown(context.Background())) - mockR.checkNumRequests(t, 1) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) - }) - } - - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, pipeline.SignalLogs, newNoopObsrepSender, + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[internal.Request]())) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + mockR := newMockRequest(2, errors.New("some error")) + require.NoError(t, be.Send(context.Background(), mockR)) + require.NoError(t, be.Shutdown(context.Background())) + mockR.checkNumRequests(t, 1) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Dropping data.", observed.All()[0].Message) } func TestQueuedRetryPersistenceEnabled(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueConfig() - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - extensions := map[component.ID]component.Component{ - storageID: storagetest.NewMockStorageExtension(nil), - } - host := &MockHost{Ext: extensions} - - // we start correctly with a file storage extension - require.NoError(t, be.Start(context.Background(), host)) - require.NoError(t, be.Shutdown(context.Background())) - }) + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueConfig() + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + extensions := map[component.ID]component.Component{ + storageID: storagetest.NewMockStorageExtension(nil), } + host := &MockHost{Ext: extensions} - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + // we start correctly with a file storage extension + require.NoError(t, be.Start(context.Background(), host)) + require.NoError(t, be.Shutdown(context.Background())) } func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - storageError := errors.New("could not get storage client") - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueConfig() - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - extensions := map[component.ID]component.Component{ - storageID: storagetest.NewMockStorageExtension(storageError), - } - host := &MockHost{Ext: extensions} - - // we fail to start if we get an error creating the storage client - require.Error(t, be.Start(context.Background(), host), "could not get storage client") - }) + storageError := errors.New("could not get storage client") + tt, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + qCfg := NewDefaultQueueConfig() + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := NewBaseExporter(set, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + extensions := map[component.ID]component.Component{ + storageID: storagetest.NewMockStorageExtension(storageError), } + host := &MockHost{Ext: extensions} - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + // we fail to start if we get an error creating the storage client + require.Error(t, be.Start(context.Background(), host), "could not get storage client") } func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown - - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered - - mockReq := newErrorRequest() - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - - extensions := map[component.ID]component.Component{ - storageID: storagetest.NewMockStorageExtension(nil), - } - host := &MockHost{Ext: extensions} + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + storageID := component.MustNewIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown + + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered + + mockReq := newErrorRequest() + be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + + extensions := map[component.ID]component.Component{ + storageID: storagetest.NewMockStorageExtension(nil), + } + host := &MockHost{Ext: extensions} - require.NoError(t, be.Start(context.Background(), host)) + require.NoError(t, be.Start(context.Background(), host)) - // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.Send(context.Background(), mockReq)) + // Invoke queuedRetrySender so the producer will put the item for consumer to poll + require.NoError(t, be.Send(context.Background(), mockReq)) - // first wait for the item to be consumed from the queue - assert.Eventually(t, func() bool { - return be.QueueSender.(*QueueSender).queue.Size() == 0 - }, time.Second, 1*time.Millisecond) + // first wait for the item to be consumed from the queue + assert.Eventually(t, func() bool { + return be.QueueSender.(*QueueSender).queue.Size() == 0 + }, time.Second, 1*time.Millisecond) - // shuts down the exporter, unsent data should be preserved as in-flight data in the persistent queue. - require.NoError(t, be.Shutdown(context.Background())) + // shuts down the exporter, unsent data should be preserved as in-flight data in the persistent queue. + require.NoError(t, be.Shutdown(context.Background())) - // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. - replacedReq := newMockRequest(1, nil) - be, err = NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), host)) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) + // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. + replacedReq := newMockRequest(1, nil) + be, err = NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), host)) + t.Cleanup(func() { + require.NoError(t, be.Shutdown(context.Background())) + }) - // wait for the item to be consumed from the queue - replacedReq.checkNumRequests(t, 1) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + // wait for the item to be consumed from the queue + replacedReq.checkNumRequests(t, 1) } func TestQueueSenderNoStartShutdown(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - set := exportertest.NewNopSettings() - set.ID = exporterID - queue := exporterqueue.NewMemoryQueueFactory[internal.Request]()( - context.Background(), - exporterqueue.Settings{ - Signal: pipeline.SignalTraces, - ExporterSettings: set, - }, - exporterqueue.NewDefaultConfig()) - obsrep, err := NewObsReport(ObsReportSettings{ - ExporterSettings: set, - Signal: pipeline.SignalTraces, - }) - require.NoError(t, err) - qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig()) - assert.NoError(t, qs.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + set := exportertest.NewNopSettings() + set.ID = exporterID + queue := exporterqueue.NewMemoryQueueFactory[internal.Request]()( + context.Background(), + exporterqueue.Settings{ + Signal: pipeline.SignalTraces, + ExporterSettings: set, + }, + exporterqueue.NewDefaultConfig()) + obsrep, err := NewObsReport(ObsReportSettings{ + ExporterSettings: set, + Signal: pipeline.SignalTraces, + }) + require.NoError(t, err) + qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig()) + assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index 470f4c62e82e..0bd391ffa0f0 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -38,155 +38,123 @@ func mockRequestMarshaler(internal.Request) ([]byte, error) { } func TestQueuedRetry_DropOnPermanentError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + rCfg := configretry.NewDefaultBackOffConfig() + mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) } func TestQueuedRetry_DropOnNoRetry(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), - WithUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), - WithQueue(qCfg), WithRetry(rCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, WithMarshaler(mockRequestMarshaler), + WithUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), + WithQueue(qCfg), WithRetry(rCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockR := newMockRequest(2, errors.New("transient error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) } func TestQueuedRetry_OnError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) - mockR := newMockRequest(2, traceErr) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) + mockR := newMockRequest(2, traceErr) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = time.Millisecond - rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - ocs.run(func() { - // Add an item that will always fail. - require.NoError(t, be.Send(context.Background(), newErrorRequest())) - }) - - mockR := newMockRequest(2, nil) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). - waitingTime := time.Since(start) - assert.Less(t, 50*time.Millisecond, waitingTime) - assert.Less(t, waitingTime, 150*time.Millisecond) - - // In the newMockConcurrentExporter we count requests and items even for failed requests. - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = time.Millisecond + rCfg.MaxElapsedTime = 100 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + ocs.run(func() { + // Add an item that will always fail. + require.NoError(t, be.Send(context.Background(), newErrorRequest())) + }) + + mockR := newMockRequest(2, nil) + start := time.Now() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // We should ensure that we wait for more than 50ms but less than 150ms (50% less and 50% more than max elapsed). + waitingTime := time.Since(start) + assert.Less(t, 50*time.Millisecond, waitingTime) + assert.Less(t, waitingTime, 150*time.Millisecond) + + // In the newMockConcurrentExporter we count requests and items even for failed requests. + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 7) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) } type wrappedError struct { @@ -198,198 +166,161 @@ func (e wrappedError) Unwrap() error { } func TestQueuedRetry_ThrottleError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 10 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) - mockR := newMockRequest(2, wrappedError{retry}) - start := time.Now() - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. - assert.Less(t, 100*time.Millisecond, time.Since(start)) - - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 10 * time.Millisecond + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + retry := NewThrottleRetry(errors.New("throttle error"), 100*time.Millisecond) + mockR := newMockRequest(2, wrappedError{retry}) + start := time.Now() + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. + assert.Less(t, 100*time.Millisecond, time.Since(start)) + + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - resetFeatureGate := setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher) - qCfg := NewDefaultQueueConfig() - qCfg.NumConsumers = 1 - qCfg.QueueSize = 1 - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.InitialInterval = 0 - be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, - WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), - WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - resetFeatureGate() - }) - - mockR := newMockRequest(2, errors.New("transient error")) - ocs.run(func() { - // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - - // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) - ocs.checkSendItemsCount(t, 2) - ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + qCfg := NewDefaultQueueConfig() + qCfg.NumConsumers = 1 + qCfg.QueueSize = 1 + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.InitialInterval = 0 + be, err := NewBaseExporter(defaultSettings, defaultSignal, newObservabilityConsumerSender, + WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) + require.NoError(t, err) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + mockR := newMockRequest(2, errors.New("transient error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.NoError(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + + // In the newMockConcurrentExporter we count requests and items even for failed requests + mockR.checkNumRequests(t, 2) + ocs.checkSendItemsCount(t, 2) + ocs.checkDroppedItemsCount(t, 0) + require.Zero(t, be.QueueSender.(*QueueSender).queue.Size()) } func TestQueueRetryWithNoQueue(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.MaxElapsedTime = time.Nanosecond // fail fast - be, err := NewBaseExporter(exportertest.NewNopSettings(), pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.MaxElapsedTime = time.Nanosecond // fail fast + be, err := NewBaseExporter(exportertest.NewNopSettings(), pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.Send(context.Background(), mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) } func TestQueueRetryWithDisabledRetires(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.Send(context.Background(), mockR)) - }) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = false + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter(set, pipeline.SignalLogs, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.Send(context.Background(), mockR)) + }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling retry_on_failure config option to retry on retryable errors.", observed.All()[0].Message) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) } func TestRetryWithContextTimeout(t *testing.T) { - runTest := func(testName string, enableQueueBatcher bool) { - t.Run(testName, func(t *testing.T) { - defer setFeatureGateForTest(t, usePullingBasedExporterQueueBatcher, enableQueueBatcher)() - const testTimeout = 10 * time.Second - - rCfg := configretry.NewDefaultBackOffConfig() - rCfg.Enabled = true - - // First attempt after 100ms is attempted - rCfg.InitialInterval = 100 * time.Millisecond - rCfg.RandomizationFactor = 0 - // Second attempt is at twice the testTimeout - rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) - qCfg := exporterqueue.NewDefaultConfig() - qCfg.Enabled = false - set := exportertest.NewNopSettings() - logger, observed := observer.New(zap.InfoLevel) - set.Logger = zap.New(logger) - be, err := NewBaseExporter( - set, - pipeline.SignalLogs, - newObservabilityConsumerSender, - WithRetry(rCfg), - WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()), - ) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.ObsrepSender.(*observabilityConsumerSender) - mockR := newErrorRequest() - - start := time.Now() - ocs.run(func() { - ctx, cancel := context.WithTimeout(context.Background(), testTimeout) - defer cancel() - err := be.Send(ctx, mockR) - require.Error(t, err) - require.Equal(t, "request will be cancelled before next retry: transient error", err.Error()) - }) - assert.Len(t, observed.All(), 2) - assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) - assert.Equal(t, "Exporting failed. Rejecting data. "+ - "Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message) - ocs.awaitAsyncProcessing() - ocs.checkDroppedItemsCount(t, 7) - require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests()) - require.NoError(t, be.Shutdown(context.Background())) - - // There should be no delay, because the initial interval is - // longer than the context timeout. Merely checking that no - // delays on the order of either the context timeout or the - // retry interval were introduced, i.e., fail fast. - elapsed := time.Since(start) - require.Less(t, elapsed, testTimeout/2) - }) - } - runTest("enable_queue_batcher", true) - runTest("disable_queue_batcher", false) + const testTimeout = 10 * time.Second + + rCfg := configretry.NewDefaultBackOffConfig() + rCfg.Enabled = true + + // First attempt after 100ms is attempted + rCfg.InitialInterval = 100 * time.Millisecond + rCfg.RandomizationFactor = 0 + // Second attempt is at twice the testTimeout + rCfg.Multiplier = float64(2 * testTimeout / rCfg.InitialInterval) + qCfg := exporterqueue.NewDefaultConfig() + qCfg.Enabled = false + set := exportertest.NewNopSettings() + logger, observed := observer.New(zap.InfoLevel) + set.Logger = zap.New(logger) + be, err := NewBaseExporter( + set, + pipeline.SignalLogs, + newObservabilityConsumerSender, + WithRetry(rCfg), + WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]()), + ) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.ObsrepSender.(*observabilityConsumerSender) + mockR := newErrorRequest() + + start := time.Now() + ocs.run(func() { + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + err := be.Send(ctx, mockR) + require.Error(t, err) + require.Equal(t, "request will be cancelled before next retry: transient error", err.Error()) + }) + assert.Len(t, observed.All(), 2) + assert.Equal(t, "Exporting failed. Will retry the request after interval.", observed.All()[0].Message) + assert.Equal(t, "Exporting failed. Rejecting data. "+ + "Try enabling sending_queue to survive temporary failures.", observed.All()[1].Message) + ocs.awaitAsyncProcessing() + ocs.checkDroppedItemsCount(t, 7) + require.Equal(t, 2, mockR.(*mockErrorRequest).getNumRequests()) + require.NoError(t, be.Shutdown(context.Background())) + + // There should be no delay, because the initial interval is + // longer than the context timeout. Merely checking that no + // delays on the order of either the context timeout or the + // retry interval were introduced, i.e., fail fast. + elapsed := time.Since(start) + require.Less(t, elapsed, testTimeout/2) } type mockErrorRequest struct {