Skip to content

Commit

Permalink
feat: make Pause and Resume methods idempotent (#124)
Browse files Browse the repository at this point in the history
* feat: make Resume Pause as idempotent

* chore: add log pause/resume

* chore: add description pause and resume

* chore: add tests

* chore: fix test
  • Loading branch information
A.Samet İleri authored Apr 2, 2024
1 parent ac1f317 commit 3239d0f
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 39 deletions.
2 changes: 1 addition & 1 deletion batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
messageGroupDuration: 20 * time.Second,
r: &mc,
concurrency: 1,
},
Expand Down
25 changes: 24 additions & 1 deletion consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ type Consumer interface {
Consume()

// Pause function pauses consumer, it is stop consuming new messages
// It works idempotent under the hood
// Calling with multiple goroutines is safe
Pause()

// Resume function resumes consumer, it is start to working
// It works idempotent under the hood
// Calling with multiple goroutines is safe
Resume()

// GetMetricCollectors for the purpose of making metric collectors available.
Expand Down Expand Up @@ -78,6 +82,7 @@ type base struct {
distributedTracingEnabled bool
consumerState state
metricPrefix string
mu sync.Mutex
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -116,6 +121,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
consumerState: stateRunning,
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
metricPrefix: cfg.MetricPrefix,
mu: sync.Mutex{},
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -173,6 +179,7 @@ func (c *base) startConsume() {
m := &kafka.Message{}
err := c.r.FetchMessage(c.context, m)
if err != nil {
c.logger.Debug("c.r.FetchMessage ", err.Error())
if c.context.Err() != nil {
continue
}
Expand Down Expand Up @@ -203,7 +210,15 @@ func (c *base) startConsume() {
}

func (c *base) Pause() {
c.logger.Info("Consumer is paused!")
c.mu.Lock()
defer c.mu.Unlock()

if c.consumerState == statePaused {
c.logger.Debug("Consumer is already paused mode!")
return
}

c.logger.Infof("Consumer is paused!")

c.cancelFn()

Expand All @@ -213,6 +228,14 @@ func (c *base) Pause() {
}

func (c *base) Resume() {
c.mu.Lock()
defer c.mu.Unlock()

if c.consumerState == stateRunning {
c.logger.Debug("Consumer is already running mode!")
return
}

c.logger.Info("Consumer is resumed!")

c.pause = make(chan struct{})
Expand Down
147 changes: 110 additions & 37 deletions consumer_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,50 +99,123 @@ func Test_base_startConsume(t *testing.T) {
}

func Test_base_Pause(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
}
go func() {
<-b.pause
}()
t.Run("Call_One_Goroutine", func(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
mu: sync.Mutex{},
}
go func() {
<-b.pause
}()

// When
b.Pause()
// When
b.Pause()

// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
})
t.Run("Call_Multiple_Goroutine", func(t *testing.T) {
// Given
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
context: ctx, cancelFn: cancelFn,
consumerState: stateRunning,
mu: sync.Mutex{},
}
go func() {
<-b.pause
}()

// When
var wg sync.WaitGroup
wg.Add(2)
go func() {
b.Pause()
wg.Done()
}()
go func() {
b.Pause()
wg.Done()
}()
wg.Wait()

// Then
if b.consumerState != statePaused {
t.Fatal("consumer state must be in paused")
}
})
}

func Test_base_Resume(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
}
t.Run("Call_One_Goroutine", func(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
mu: sync.Mutex{},
}

// When
b.Resume()
// When
b.Resume()

// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
})
t.Run("Call_Multiple_Goroutine", func(t *testing.T) {
// Given
mc := mockReader{}
ctx, cancelFn := context.WithCancel(context.Background())
b := base{
r: &mc,
logger: NewZapLogger(LogLevelDebug),
pause: make(chan struct{}),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
context: ctx, cancelFn: cancelFn,
mu: sync.Mutex{},
}

// When
var wg sync.WaitGroup
wg.Add(2)
go func() {
b.Resume()
wg.Done()
}()
go func() {
b.Resume()
wg.Done()
}()
wg.Wait()

// Then
if b.consumerState != stateRunning {
t.Fatal("consumer state must be in running")
}
if ctx == b.context {
t.Fatal("contexts must be differ!")
}
})
}

type mockReader struct {
Expand Down

0 comments on commit 3239d0f

Please sign in to comment.