diff --git a/Makefile b/Makefile index 9e5dd23d3f..9bfd55a958 100644 --- a/Makefile +++ b/Makefile @@ -210,7 +210,7 @@ PKG_WITH_DATA_RACE += internal/retryer PKG_WITH_DATA_RACE += internal/tls PKG_WITH_DATA_RACE += plugins/inputs/logfile PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail -PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch +PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch$$ PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals PKG_WITH_DATA_RACE += plugins/processors/ec2tagger PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|') diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go index 52b40ef71c..badedb89d1 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go @@ -4,16 +4,13 @@ package pusher import ( - "bytes" - "io" - "log" - "os" "strings" "testing" "time" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" + + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" ) type stubLogEvent struct { @@ -44,7 +41,7 @@ func newStubLogEvent(message string, timestamp time.Time) *stubLogEvent { } func TestConverter(t *testing.T) { - logger := testutil.Logger{Name: "converter"} + logger := testutil.NewNopLogger() target := Target{Group: "testGroup", Stream: "testStream"} t.Run("WithValidTimestamp", func(t *testing.T) { @@ -86,20 +83,19 @@ func TestConverter(t *testing.T) { t.Run("WithOldTimestampWarning", func(t *testing.T) { oldTime := time.Now().Add(-25 * time.Hour) - conv := newConverter(logger, target) + logSink := testutil.NewLogSink() + conv := newConverter(logSink, target) conv.lastValidTime = oldTime conv.lastUpdateTime = oldTime - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) le := conv.convert(newStubLogEvent("Test message", time.Time{})) assert.Equal(t, oldTime, le.timestamp) assert.Equal(t, "Test message", le.message) - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - assert.Len(t, loglines, 1) - logline := loglines[0] - assert.True(t, strings.Contains(logline, "W!")) - assert.True(t, strings.Contains(logline, "Unable to parse timestamp")) + logLines := logSink.Lines() + assert.Len(t, logLines, 1) + logLine := logLines[0] + assert.True(t, strings.Contains(logLine, "W!")) + assert.True(t, strings.Contains(logLine, "Unable to parse timestamp")) }) } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go index 3bf9eb7f34..f5afeb309a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool.go @@ -19,6 +19,7 @@ type workerPool struct { workerCount atomic.Int32 wg sync.WaitGroup stopCh chan struct{} + stopLock sync.RWMutex } // NewWorkerPool creates a pool of workers of the specified size. @@ -53,6 +54,8 @@ func (p *workerPool) worker() { // Submit adds a task to the pool. Blocks until a worker is available to receive the task or the pool is stopped. func (p *workerPool) Submit(task func()) { + p.stopLock.RLock() + defer p.stopLock.RUnlock() select { case <-p.stopCh: return @@ -72,6 +75,8 @@ func (p *workerPool) WorkerCount() int32 { // Stop closes the channels and waits for the workers to stop. func (p *workerPool) Stop() { + p.stopLock.Lock() + defer p.stopLock.Unlock() select { case <-p.stopCh: return diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go index d2962e85ff..b706688034 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go @@ -9,11 +9,11 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" ) func TestWorkerPool(t *testing.T) { @@ -104,7 +104,7 @@ func TestWorkerPool(t *testing.T) { } func TestSenderPool(t *testing.T) { - logger := testutil.Logger{Name: "test"} + logger := testutil.NewNopLogger() stop := make(chan struct{}) mockService := new(mockLogsService) mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil) diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go index 2ab3970f78..54f68621f9 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go @@ -9,10 +9,10 @@ import ( "testing" "time" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" ) const eventCount = 100000 @@ -22,7 +22,7 @@ func TestPusher(t *testing.T) { t.Parallel() stop := make(chan struct{}) var wg sync.WaitGroup - pusher := setupPusher(t, "single", nil, stop, &wg) + pusher := setupPusher(t, nil, stop, &wg) var completed atomic.Int32 generateEvents(t, pusher, &completed) @@ -36,7 +36,7 @@ func TestPusher(t *testing.T) { stop := make(chan struct{}) var wg sync.WaitGroup wp := NewWorkerPool(5) - pusher := setupPusher(t, "pool", wp, stop, &wg) + pusher := setupPusher(t, wp, stop, &wg) _, isSenderPool := pusher.Sender.(*senderPool) assert.True(t, isSenderPool) @@ -63,9 +63,9 @@ func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) { } } -func setupPusher(t *testing.T, name string, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher { +func setupPusher(t *testing.T, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher { t.Helper() - logger := testutil.Logger{Name: name} + logger := testutil.NewNopLogger() target := Target{Group: "G", Stream: "S", Retention: 7} service := new(stubLogsService) service.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go index 89f26faed4..da3a28a25a 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue.go @@ -33,7 +33,7 @@ type queue struct { flushCh chan struct{} resetTimerCh chan struct{} flushTimer *time.Timer - flushTimeout time.Duration + flushTimeout atomic.Value stop <-chan struct{} lastSentTime atomic.Value @@ -61,11 +61,11 @@ func newQueue( flushCh: make(chan struct{}), resetTimerCh: make(chan struct{}), flushTimer: time.NewTimer(flushTimeout), - flushTimeout: flushTimeout, stop: stop, startNonBlockCh: make(chan struct{}), wg: wg, } + q.flushTimeout.Store(flushTimeout) q.wg.Add(1) go q.start() return q @@ -112,13 +112,15 @@ func (q *queue) start() { // Merge events from both blocking and non-blocking channel go func() { + var nonBlockingEventsCh <-chan logs.LogEvent for { select { case e := <-q.eventsCh: mergeChan <- e - case e := <-q.nonBlockingEventsCh: + case e := <-nonBlockingEventsCh: mergeChan <- e case <-q.startNonBlockCh: + nonBlockingEventsCh = q.nonBlockingEventsCh case <-q.stop: return } @@ -141,7 +143,8 @@ func (q *queue) start() { q.batch.append(event) case <-q.flushCh: lastSentTime, _ := q.lastSentTime.Load().(time.Time) - if time.Since(lastSentTime) >= q.flushTimeout && len(q.batch.events) > 0 { + flushTimeout, _ := q.flushTimeout.Load().(time.Duration) + if time.Since(lastSentTime) >= flushTimeout && len(q.batch.events) > 0 { q.send() } else { q.resetFlushTimer() @@ -188,7 +191,9 @@ func (q *queue) manageFlushTimer() { q.flushCh <- struct{}{} case <-q.resetTimerCh: q.stopFlushTimer() - q.flushTimer.Reset(q.flushTimeout) + if flushTimeout, ok := q.flushTimeout.Load().(time.Duration); ok { + q.flushTimer.Reset(flushTimeout) + } case <-q.stop: q.stopFlushTimer() return diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go index 3d81078cab..d12fe0f48d 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go @@ -4,24 +4,22 @@ package pusher import ( - "bytes" "errors" "fmt" - "io" - "log" - "os" "strings" "sync" + "sync/atomic" "testing" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/influxdata/telegraf/testutil" + "github.com/influxdata/telegraf" "github.com/stretchr/testify/require" "github.com/aws/amazon-cloudwatch-agent/logs" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" "github.com/aws/amazon-cloudwatch-agent/tool/util" ) @@ -72,7 +70,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { t.Parallel() var wg sync.WaitGroup var s stubLogsService - called := false + var called atomic.Bool expectedEntity := &cloudwatchlogs.Entity{ Attributes: map[string]*string{ "PlatformType": aws.String("AWS::EC2"), @@ -87,7 +85,7 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { } s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - called = true + called.Store(true) if *in.LogGroupName != "G" || *in.LogStreamName != "S" { t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName) @@ -101,15 +99,16 @@ func TestAddSingleEvent_WithAccountId(t *testing.T) { } ep := newMockEntityProvider(expectedEntity) - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) - require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") + require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - q.flushTimeout = time.Second + q.flushTimeout.Store(200 * time.Millisecond) + time.Sleep(10 * time.Millisecond) q.resetFlushTimer() - time.Sleep(2 * time.Second) - require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") + time.Sleep(time.Second) + require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") close(stop) wg.Wait() @@ -119,10 +118,10 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { t.Parallel() var wg sync.WaitGroup var s stubLogsService - called := false + var called atomic.Bool s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - called = true + called.Store(true) if *in.LogGroupName != "G" || *in.LogStreamName != "S" { t.Errorf("PutLogEvents called with wrong group and stream: %v/%v", *in.LogGroupName, *in.LogStreamName) @@ -136,15 +135,16 @@ func TestAddSingleEvent_WithoutAccountId(t *testing.T) { } ep := newMockEntityProvider(nil) - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, ep, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) - require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") + require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") - q.flushTimeout = time.Second + q.flushTimeout.Store(time.Second) + time.Sleep(10 * time.Millisecond) q.resetFlushTimer() time.Sleep(2 * time.Second) - require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") + require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") close(stop) wg.Wait() @@ -154,27 +154,27 @@ func TestStopQueueWouldDoFinalSend(t *testing.T) { t.Parallel() var wg sync.WaitGroup var s stubLogsService - called := false + var called atomic.Bool s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - called = true + called.Store(true) if len(in.LogEvents) != 1 { t.Errorf("PutLogEvents called with incorrect number of message, expecting 1, but %v received", len(in.LogEvents)) } return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) time.Sleep(10 * time.Millisecond) - require.False(t, called, "PutLogEvents has been called too fast, it should wait until FlushTimeout.") + require.False(t, called.Load(), "PutLogEvents has been called too fast, it should wait until FlushTimeout.") close(stop) wg.Wait() - require.True(t, called, "PutLogEvents has not been called after FlushTimeout has been reached.") + require.True(t, called.Load(), "PutLogEvents has not been called after FlushTimeout has been reached.") } func TestStopPusherWouldStopRetries(t *testing.T) { @@ -182,27 +182,27 @@ func TestStopPusherWouldStopRetries(t *testing.T) { var wg sync.WaitGroup var s stubLogsService - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return nil, &cloudwatchlogs.ServiceUnavailableException{} } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now())) + time.Sleep(10 * time.Millisecond) - sendComplete := make(chan struct{}) - - go func() { - defer close(sendComplete) - q.send() - }() - + triggerSend(t, q) + // stop should try flushing the remaining events with retry disabled close(stop) - select { - case <-time.After(50 * time.Millisecond): - t.Errorf("send did not quit retrying after p has been Stopped.") - case <-sendComplete: - } + time.Sleep(50 * time.Millisecond) + wg.Wait() + + logLines := logSink.Lines() + require.Equal(t, 3, len(logLines), fmt.Sprintf("Expecting 3 logs, but %d received", len(logLines))) + lastLine := logLines[len(logLines)-1] + require.True(t, strings.Contains(lastLine, "E!")) + require.True(t, strings.Contains(lastLine, "Stop requested after 0 retries to G/S failed for PutLogEvents, request dropped")) } func TestLongMessageGetsTruncated(t *testing.T) { @@ -231,14 +231,10 @@ func TestLongMessageGetsTruncated(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent(longMsg, time.Now())) - for len(q.batch.events) < 1 { - time.Sleep(10 * time.Millisecond) - } - - q.send() + triggerSend(t, q) close(stop) wg.Wait() } @@ -262,13 +258,13 @@ func TestRequestIsLessThan1MB(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 8; i++ { q.AddEvent(newStubLogEvent(longMsg, time.Now())) } time.Sleep(10 * time.Millisecond) - q.send() - q.send() + triggerSend(t, q) + triggerSend(t, q) close(stop) wg.Wait() } @@ -287,13 +283,13 @@ func TestRequestIsLessThan10kEvents(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 30000; i++ { q.AddEvent(newStubLogEvent(msg, time.Now())) } time.Sleep(10 * time.Millisecond) for i := 0; i < 5; i++ { - q.send() + triggerSend(t, q) } close(stop) wg.Wait() @@ -312,44 +308,41 @@ func TestTimestampPopulation(t *testing.T) { return &cloudwatchlogs.PutLogEventsOutput{}, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for i := 0; i < 3; i++ { q.AddEvent(newStubLogEvent("msg", time.Time{})) } time.Sleep(10 * time.Millisecond) for i := 0; i < 5; i++ { - q.send() + triggerSend(t, q) } close(stop) wg.Wait() } func TestIgnoreOutOfTimeRangeEvent(t *testing.T) { + t.Parallel() var wg sync.WaitGroup var s stubLogsService - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { t.Errorf("PutLogEvents should not be called for out of range events") return &cloudwatchlogs.PutLogEventsOutput{}, nil } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, q := testPreparation(-1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG", time.Now().Add(-15*24*time.Hour))) q.AddEventNonBlocking(newStubLogEvent("MSG", time.Now().Add(2*time.Hour+1*time.Minute))) - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - require.Equal(t, 2, len(loglines), fmt.Sprintf("Expecting 2 error logs, but %d received", len(loglines))) + logLines := logSink.Lines() + require.Equal(t, 2, len(logLines), fmt.Sprintf("Expecting 2 error logs, but %d received", len(logLines))) - for _, logline := range loglines { - require.True(t, strings.Contains(logline, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logbuf.String())) - require.True(t, strings.Contains(logline, "Discard the log entry"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logbuf.String())) + for _, logLine := range logLines { + require.True(t, strings.Contains(logLine, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logSink)) + require.True(t, strings.Contains(logLine, "Discard the log entry"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logSink)) } - log.SetOutput(os.Stderr) - time.Sleep(20 * time.Millisecond) close(stop) wg.Wait() @@ -390,12 +383,13 @@ func TestAddMultipleEvents(t *testing.T) { )) } evts[10], evts[90] = evts[90], evts[10] // make events out of order - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) for _, e := range evts { q.AddEvent(e) } - q.flushTimeout = 10 * time.Millisecond + q.flushTimeout.Store(10 * time.Millisecond) + time.Sleep(10 * time.Millisecond) q.resetFlushTimer() time.Sleep(time.Second) @@ -408,10 +402,10 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { t.Parallel() var wg sync.WaitGroup var s stubLogsService + var ci atomic.Int32 - ci := 0 s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - if ci == 0 { + if ci.Load() == 0 { if len(in.LogEvents) != 3 { t.Errorf("PutLogEvents called with incorrect number of message, expecting 3, but %v received", len(in.LogEvents)) } @@ -422,9 +416,9 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { } } - ci++ + ci.Add(1) return &cloudwatchlogs.PutLogEventsOutput{}, nil - } else if ci == 1 { + } else if ci.Load() == 1 { if len(in.LogEvents) != 1 { t.Errorf("PutLogEvents called with incorrect number of message, expecting 1, but %v received", len(in.LogEvents)) } @@ -440,12 +434,13 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { return nil, nil } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("MSG 25hrs ago", time.Now().Add(-25*time.Hour))) q.AddEvent(newStubLogEvent("MSG 24hrs ago", time.Now().Add(-24*time.Hour))) q.AddEvent(newStubLogEvent("MSG 23hrs ago", time.Now().Add(-23*time.Hour))) q.AddEvent(newStubLogEvent("MSG now", time.Now())) - q.flushTimeout = 10 * time.Millisecond + q.flushTimeout.Store(10 * time.Millisecond) + time.Sleep(10 * time.Millisecond) q.resetFlushTimer() time.Sleep(20 * time.Millisecond) close(stop) @@ -453,45 +448,43 @@ func TestSendReqWhenEventsSpanMoreThan24Hrs(t *testing.T) { } func TestUnhandledErrorWouldNotResend(t *testing.T) { + t.Parallel() var wg sync.WaitGroup var s stubLogsService + var cnt atomic.Int32 - cnt := 0 - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - if cnt == 0 { - cnt++ + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + if cnt.Load() == 0 { + cnt.Add(1) return nil, errors.New("unhandled error") } t.Errorf("Pusher should not attempt a resend when an unhandled error has been returned") return &cloudwatchlogs.PutLogEventsOutput{}, nil } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, q := testPreparation(-1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, 2*time.Hour, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) - logline := logbuf.String() - require.True(t, strings.Contains(logline, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logbuf.String())) - require.True(t, strings.Contains(logline, "unhandled error"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logbuf.String())) - - log.SetOutput(os.Stderr) + logLine := logSink.String() + require.True(t, strings.Contains(logLine, "E!"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) + require.True(t, strings.Contains(logLine, "unhandled error"), fmt.Sprintf("Expecting error log with unhandled error, but received '%s' in the log", logLine)) close(stop) wg.Wait() - require.Equal(t, 1, cnt, fmt.Sprintf("Expecting pusher to call send 1 time, but %d times called", cnt)) + require.EqualValues(t, 1, cnt.Load(), fmt.Sprintf("Expecting pusher to call send 1 time, but %d times called", cnt.Load())) } func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { + t.Parallel() var wg sync.WaitGroup var s stubLogsService - var plec, clgc, clsc int - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + var plec, clgc, clsc atomic.Int32 + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { var e error - switch plec { + switch plec.Load() { case 0: e = &cloudwatchlogs.ResourceNotFoundException{} case 1: @@ -499,40 +492,39 @@ func TestCreateLogGroupAndLogStreamWhenNotFound(t *testing.T) { case 2: return &cloudwatchlogs.PutLogEventsOutput{}, nil default: - t.Errorf("Unexpected PutLogEvents call (%d time)", plec) + t.Errorf("Unexpected PutLogEvents call (%d time)", plec.Load()) } - plec++ + plec.Add(1) return nil, e } - s.clg = func(in *cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { - clgc++ + s.clg = func(*cloudwatchlogs.CreateLogGroupInput) (*cloudwatchlogs.CreateLogGroupOutput, error) { + clgc.Add(1) return nil, nil } - s.cls = func(in *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { - clsc++ + s.cls = func(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + clsc.Add(1) return nil, nil } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) - q.AddEvent(newStubLogEvent("msg", time.Now())) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + var eventWG sync.WaitGroup + eventWG.Add(1) + q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) time.Sleep(10 * time.Millisecond) - q.send() + triggerSend(t, q) + eventWG.Wait() foundUnknownErr := false - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - for _, logline := range loglines { - if strings.Contains(logline, "E!") && strings.Contains(logline, "Unknown Error") { + logLines := logSink.Lines() + for _, logLine := range logLines { + if strings.Contains(logLine, "E!") && strings.Contains(logLine, "Unknown Error") { foundUnknownErr = true } } - require.True(t, foundUnknownErr, fmt.Sprintf("Expecting error log with unknown error, but received '%s' in the log", logbuf.String())) - - log.SetOutput(os.Stderr) + require.True(t, foundUnknownErr, fmt.Sprintf("Expecting error log with unknown error, but received '%s' in the log", logSink)) close(stop) wg.Wait() @@ -542,7 +534,7 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { var wg sync.WaitGroup var s stubLogsService - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { return &cloudwatchlogs.PutLogEventsOutput{ RejectedLogEventsInfo: &cloudwatchlogs.RejectedLogEventsInfo{ TooOldLogEventEndIndex: aws.Int64(100), @@ -552,30 +544,29 @@ func TestLogRejectedLogEntryInfo(t *testing.T) { }, nil } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) - q.AddEvent(newStubLogEvent("msg", time.Now())) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) + var eventWG sync.WaitGroup + eventWG.Add(1) + q.AddEvent(&stubLogEvent{message: "msg", timestamp: time.Now(), done: eventWG.Done}) time.Sleep(10 * time.Millisecond) - q.send() - - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - require.Len(t, loglines, 4, fmt.Sprintf("Expecting 3 error logs, but %d received", len(loglines))) + triggerSend(t, q) - logline := loglines[0] - require.True(t, strings.Contains(logline, "W!"), fmt.Sprintf("Expecting error log events too old, but received '%s' in the log", logbuf.String())) - require.True(t, strings.Contains(logline, "100"), fmt.Sprintf("Expecting error log events too old, but received '%s' in the log", logbuf.String())) + eventWG.Wait() + logLines := logSink.Lines() + require.Len(t, logLines, 4, fmt.Sprintf("Expecting 3 error logs, but %d received", len(logLines))) - logline = loglines[1] - require.True(t, strings.Contains(logline, "W!"), fmt.Sprintf("Expecting error log events too new, but received '%s' in the log", logbuf.String())) - require.True(t, strings.Contains(logline, "200"), fmt.Sprintf("Expecting error log events too new, but received '%s' in the log", logbuf.String())) + logLine := logLines[0] + require.True(t, strings.Contains(logLine, "W!"), fmt.Sprintf("Expecting error log events too old, but received '%s' in the log", logSink.String())) + require.True(t, strings.Contains(logLine, "100"), fmt.Sprintf("Expecting error log events too old, but received '%s' in the log", logSink.String())) - logline = loglines[2] - require.True(t, strings.Contains(logline, "W!"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logbuf.String())) - require.True(t, strings.Contains(logline, "300"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logbuf.String())) + logLine = logLines[1] + require.True(t, strings.Contains(logLine, "W!"), fmt.Sprintf("Expecting error log events too new, but received '%s' in the log", logSink.String())) + require.True(t, strings.Contains(logLine, "200"), fmt.Sprintf("Expecting error log events too new, but received '%s' in the log", logSink.String())) - log.SetOutput(os.Stderr) + logLine = logLines[2] + require.True(t, strings.Contains(logLine, "W!"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) + require.True(t, strings.Contains(logLine, "300"), fmt.Sprintf("Expecting error log events too expired, but received '%s' in the log", logSink.String())) close(stop) wg.Wait() @@ -603,9 +594,7 @@ func TestAddEventNonBlocking(t *testing.T) { start.Add(time.Duration(i)*time.Millisecond), )) } - stop, q := testPreparation(-1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) - q.flushTimeout = 50 * time.Millisecond - q.resetFlushTimer() + stop, q := testPreparation(t, -1, &s, 1*time.Hour, 2*time.Hour, nil, &wg) time.Sleep(200 * time.Millisecond) // Wait until pusher started, merge channel is blocked for _, e := range evts { @@ -613,49 +602,78 @@ func TestAddEventNonBlocking(t *testing.T) { } time.Sleep(time.Second) + triggerSend(t, q) + time.Sleep(20 * time.Millisecond) close(stop) wg.Wait() } func TestResendWouldStopAfterExhaustedRetries(t *testing.T) { + t.Parallel() var wg sync.WaitGroup var s stubLogsService + var cnt atomic.Int32 - cnt := 0 - s.ple = func(in *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { - cnt++ + s.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + cnt.Add(1) return nil, &cloudwatchlogs.ServiceUnavailableException{} } - var logbuf bytes.Buffer - log.SetOutput(io.MultiWriter(&logbuf, os.Stdout)) - - stop, q := testPreparation(-1, &s, 10*time.Millisecond, time.Second, nil, &wg) + logSink := testutil.NewLogSink() + stop, q := testPreparationWithLogger(t, logSink, -1, &s, 10*time.Millisecond, time.Second, nil, &wg) q.AddEvent(newStubLogEvent("msg", time.Now())) time.Sleep(2 * time.Second) - loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n") - lastline := loglines[len(loglines)-1] - expected := fmt.Sprintf("All %v retries to G/S failed for PutLogEvents, request dropped.", cnt-1) - require.True(t, strings.HasSuffix(lastline, expected), fmt.Sprintf("Expecting error log to end with request dropped, but received '%s' in the log", logbuf.String())) - - log.SetOutput(os.Stderr) + logLines := logSink.Lines() + lastLine := logLines[len(logLines)-1] + expected := fmt.Sprintf("All %v retries to G/S failed for PutLogEvents, request dropped.", cnt.Load()-1) + require.True(t, strings.HasSuffix(lastLine, expected), fmt.Sprintf("Expecting error log to end with request dropped, but received '%s' in the log", logSink.String())) close(stop) wg.Wait() } +// Cannot call q.send() directly as it would cause a race condition. Reset last sent time and trigger flush. +func triggerSend(t *testing.T, q *queue) { + t.Helper() + q.lastSentTime.Store(time.Time{}) + q.flushCh <- struct{}{} +} + func testPreparation( + t *testing.T, + retention int, + service cloudWatchLogsService, + flushTimeout time.Duration, + retryDuration time.Duration, + entityProvider logs.LogEntityProvider, + wg *sync.WaitGroup, +) (chan struct{}, *queue) { + return testPreparationWithLogger( + t, + testutil.NewNopLogger(), + retention, + service, + flushTimeout, + retryDuration, + entityProvider, + wg, + ) +} + +func testPreparationWithLogger( + t *testing.T, + logger telegraf.Logger, retention int, - service *stubLogsService, + service cloudWatchLogsService, flushTimeout time.Duration, retryDuration time.Duration, entityProvider logs.LogEntityProvider, wg *sync.WaitGroup, ) (chan struct{}, *queue) { + t.Helper() stop := make(chan struct{}) - logger := testutil.Logger{Name: "test"} tm := NewTargetManager(logger, service) s := newSender(logger, service, tm, retryDuration, stop) q := newQueue( diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go b/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go index edb12eb49d..b981211582 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/retry.go @@ -34,10 +34,6 @@ const ( maxRetryDelay = 1 * time.Minute ) -var ( - seededRand = rand.New(rand.NewSource(time.Now().UnixNano())) // nolint:gosec -) - type retryWaitStrategy int const ( @@ -61,7 +57,11 @@ func retryWait(baseRetryDelay time.Duration, maxBackoffRetries int, retryCount i if retryCount < maxBackoffRetries { d = baseRetryDelay * time.Duration(1< maxRetryDelayTarget { delay = maxRetryDelayTarget } - return time.Duration(seededRand.Int63n(int64(delay/2)) + int64(delay/2)) + return withJitter(delay) } diff --git a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go index 2aa0b07d9e..2a1f120f3b 100644 --- a/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go +++ b/plugins/outputs/cloudwatchlogs/internal/pusher/target_test.go @@ -11,15 +11,15 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs" + "github.com/aws/amazon-cloudwatch-agent/tool/testutil" ) func TestTargetManager(t *testing.T) { - logger := testutil.Logger{Name: "test"} + logger := testutil.NewNopLogger() t.Run("CreateLogStream", func(t *testing.T) { target := Target{Group: "G", Stream: "S"} diff --git a/tool/testutil/testutil.go b/tool/testutil/testutil.go index f26e838048..3cc3200913 100644 --- a/tool/testutil/testutil.go +++ b/tool/testutil/testutil.go @@ -5,8 +5,12 @@ package testutil import ( "fmt" + "strings" + "sync" "testing" + "github.com/influxdata/telegraf" + "github.com/aws/amazon-cloudwatch-agent/tool/stdin" ) @@ -34,3 +38,109 @@ func Type(inputChan chan<- string, inputString ...string) { } }() } + +type LogSink struct { + mu sync.Mutex + lines []string +} + +var _ telegraf.Logger = (*LogSink)(nil) + +func NewLogSink() *LogSink { + return &LogSink{ + lines: make([]string, 0), + } +} + +func (l *LogSink) Errorf(format string, args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "E! "+fmt.Sprintf(format, args...)) +} + +func (l *LogSink) Error(args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "E! "+fmt.Sprint(args...)) +} + +func (l *LogSink) Debugf(format string, args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "D! "+fmt.Sprintf(format, args...)) +} + +func (l *LogSink) Debug(args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "D! "+fmt.Sprint(args...)) +} + +func (l *LogSink) Warnf(format string, args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "W! "+fmt.Sprintf(format, args...)) +} + +func (l *LogSink) Warn(args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "W! "+fmt.Sprint(args...)) +} + +func (l *LogSink) Infof(format string, args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "I! "+fmt.Sprintf(format, args...)) +} + +func (l *LogSink) Info(args ...any) { + l.mu.Lock() + defer l.mu.Unlock() + l.lines = append(l.lines, "I! "+fmt.Sprint(args...)) +} + +func (l *LogSink) Lines() []string { + l.mu.Lock() + defer l.mu.Unlock() + lines := make([]string, len(l.lines)) + copy(lines, l.lines) + return lines +} + +func (l *LogSink) String() string { + return strings.Join(l.Lines(), "\n") +} + +type NopLogger struct { +} + +var _ telegraf.Logger = (*NopLogger)(nil) + +func NewNopLogger() telegraf.Logger { + return &NopLogger{} +} + +func (n NopLogger) Errorf(string, ...interface{}) { +} + +func (n NopLogger) Error(...interface{}) { +} + +func (n NopLogger) Debugf(string, ...interface{}) { +} + +func (n NopLogger) Debug(...interface{}) { +} + +func (n NopLogger) Warnf(string, ...interface{}) { +} + +func (n NopLogger) Warn(...interface{}) { +} + +func (n NopLogger) Infof(string, ...interface{}) { +} + +func (n NopLogger) Info(...interface{}) { +}