Skip to content

Commit

Permalink
Fix data races in cloudwatchlogs (#1551)
Browse files Browse the repository at this point in the history
  • Loading branch information
jefchien authored Feb 13, 2025
1 parent 4d57714 commit 8e12340
Show file tree
Hide file tree
Showing 12 changed files with 314 additions and 180 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ PKG_WITH_DATA_RACE += internal/retryer
PKG_WITH_DATA_RACE += internal/tls
PKG_WITH_DATA_RACE += plugins/inputs/logfile
PKG_WITH_DATA_RACE += plugins/inputs/logfile/tail
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch
PKG_WITH_DATA_RACE += plugins/outputs/cloudwatch$$
PKG_WITH_DATA_RACE += plugins/processors/awsapplicationsignals
PKG_WITH_DATA_RACE += plugins/processors/ec2tagger
PKG_WITH_DATA_RACE_PATTERN := $(shell echo '$(PKG_WITH_DATA_RACE)' | tr ' ' '|')
Expand Down
24 changes: 10 additions & 14 deletions plugins/outputs/cloudwatchlogs/internal/pusher/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
package pusher

import (
"bytes"
"io"
"log"
"os"
"strings"
"testing"
"time"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"

"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)

type stubLogEvent struct {
Expand Down Expand Up @@ -44,7 +41,7 @@ func newStubLogEvent(message string, timestamp time.Time) *stubLogEvent {
}

func TestConverter(t *testing.T) {
logger := testutil.Logger{Name: "converter"}
logger := testutil.NewNopLogger()
target := Target{Group: "testGroup", Stream: "testStream"}

t.Run("WithValidTimestamp", func(t *testing.T) {
Expand Down Expand Up @@ -86,20 +83,19 @@ func TestConverter(t *testing.T) {

t.Run("WithOldTimestampWarning", func(t *testing.T) {
oldTime := time.Now().Add(-25 * time.Hour)
conv := newConverter(logger, target)
logSink := testutil.NewLogSink()
conv := newConverter(logSink, target)
conv.lastValidTime = oldTime
conv.lastUpdateTime = oldTime

var logbuf bytes.Buffer
log.SetOutput(io.MultiWriter(&logbuf, os.Stdout))
le := conv.convert(newStubLogEvent("Test message", time.Time{}))

assert.Equal(t, oldTime, le.timestamp)
assert.Equal(t, "Test message", le.message)
loglines := strings.Split(strings.TrimSpace(logbuf.String()), "\n")
assert.Len(t, loglines, 1)
logline := loglines[0]
assert.True(t, strings.Contains(logline, "W!"))
assert.True(t, strings.Contains(logline, "Unable to parse timestamp"))
logLines := logSink.Lines()
assert.Len(t, logLines, 1)
logLine := logLines[0]
assert.True(t, strings.Contains(logLine, "W!"))
assert.True(t, strings.Contains(logLine, "Unable to parse timestamp"))
})
}
5 changes: 5 additions & 0 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type workerPool struct {
workerCount atomic.Int32
wg sync.WaitGroup
stopCh chan struct{}
stopLock sync.RWMutex
}

// NewWorkerPool creates a pool of workers of the specified size.
Expand Down Expand Up @@ -53,6 +54,8 @@ func (p *workerPool) worker() {

// Submit adds a task to the pool. Blocks until a worker is available to receive the task or the pool is stopped.
func (p *workerPool) Submit(task func()) {
p.stopLock.RLock()
defer p.stopLock.RUnlock()
select {
case <-p.stopCh:
return
Expand All @@ -72,6 +75,8 @@ func (p *workerPool) WorkerCount() int32 {

// Stop closes the channels and waits for the workers to stop.
func (p *workerPool) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
select {
case <-p.stopCh:
return
Expand Down
4 changes: 2 additions & 2 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)

func TestWorkerPool(t *testing.T) {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestWorkerPool(t *testing.T) {
}

func TestSenderPool(t *testing.T) {
logger := testutil.Logger{Name: "test"}
logger := testutil.NewNopLogger()
stop := make(chan struct{})
mockService := new(mockLogsService)
mockService.On("PutLogEvents", mock.Anything).Return(&cloudwatchlogs.PutLogEventsOutput{}, nil)
Expand Down
10 changes: 5 additions & 5 deletions plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"testing"
"time"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"

"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
"github.com/aws/amazon-cloudwatch-agent/tool/testutil"
)

const eventCount = 100000
Expand All @@ -22,7 +22,7 @@ func TestPusher(t *testing.T) {
t.Parallel()
stop := make(chan struct{})
var wg sync.WaitGroup
pusher := setupPusher(t, "single", nil, stop, &wg)
pusher := setupPusher(t, nil, stop, &wg)

var completed atomic.Int32
generateEvents(t, pusher, &completed)
Expand All @@ -36,7 +36,7 @@ func TestPusher(t *testing.T) {
stop := make(chan struct{})
var wg sync.WaitGroup
wp := NewWorkerPool(5)
pusher := setupPusher(t, "pool", wp, stop, &wg)
pusher := setupPusher(t, wp, stop, &wg)

_, isSenderPool := pusher.Sender.(*senderPool)
assert.True(t, isSenderPool)
Expand All @@ -63,9 +63,9 @@ func generateEvents(t *testing.T, pusher *Pusher, completed *atomic.Int32) {
}
}

func setupPusher(t *testing.T, name string, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher {
func setupPusher(t *testing.T, workerPool WorkerPool, stop chan struct{}, wg *sync.WaitGroup) *Pusher {
t.Helper()
logger := testutil.Logger{Name: name}
logger := testutil.NewNopLogger()
target := Target{Group: "G", Stream: "S", Retention: 7}
service := new(stubLogsService)
service.ple = func(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) {
Expand Down
15 changes: 10 additions & 5 deletions plugins/outputs/cloudwatchlogs/internal/pusher/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type queue struct {
flushCh chan struct{}
resetTimerCh chan struct{}
flushTimer *time.Timer
flushTimeout time.Duration
flushTimeout atomic.Value
stop <-chan struct{}
lastSentTime atomic.Value

Expand Down Expand Up @@ -61,11 +61,11 @@ func newQueue(
flushCh: make(chan struct{}),
resetTimerCh: make(chan struct{}),
flushTimer: time.NewTimer(flushTimeout),
flushTimeout: flushTimeout,
stop: stop,
startNonBlockCh: make(chan struct{}),
wg: wg,
}
q.flushTimeout.Store(flushTimeout)
q.wg.Add(1)
go q.start()
return q
Expand Down Expand Up @@ -112,13 +112,15 @@ func (q *queue) start() {

// Merge events from both blocking and non-blocking channel
go func() {
var nonBlockingEventsCh <-chan logs.LogEvent
for {
select {
case e := <-q.eventsCh:
mergeChan <- e
case e := <-q.nonBlockingEventsCh:
case e := <-nonBlockingEventsCh:
mergeChan <- e
case <-q.startNonBlockCh:
nonBlockingEventsCh = q.nonBlockingEventsCh
case <-q.stop:
return
}
Expand All @@ -141,7 +143,8 @@ func (q *queue) start() {
q.batch.append(event)
case <-q.flushCh:
lastSentTime, _ := q.lastSentTime.Load().(time.Time)
if time.Since(lastSentTime) >= q.flushTimeout && len(q.batch.events) > 0 {
flushTimeout, _ := q.flushTimeout.Load().(time.Duration)
if time.Since(lastSentTime) >= flushTimeout && len(q.batch.events) > 0 {
q.send()
} else {
q.resetFlushTimer()
Expand Down Expand Up @@ -188,7 +191,9 @@ func (q *queue) manageFlushTimer() {
q.flushCh <- struct{}{}
case <-q.resetTimerCh:
q.stopFlushTimer()
q.flushTimer.Reset(q.flushTimeout)
if flushTimeout, ok := q.flushTimeout.Load().(time.Duration); ok {
q.flushTimer.Reset(flushTimeout)
}
case <-q.stop:
q.stopFlushTimer()
return
Expand Down
Loading

0 comments on commit 8e12340

Please sign in to comment.