Skip to content

Commit

Permalink
Merge branch 'v2' into keremcankabadayi/v2
Browse files Browse the repository at this point in the history
# Conflicts:
#	consumer_base.go
  • Loading branch information
Abdulsametileri committed Jun 1, 2024
2 parents 72deca3 + ed713be commit 413227a
Show file tree
Hide file tree
Showing 19 changed files with 538 additions and 108 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap
| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | |
| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | |
| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | |
| `batchConfiguration.balancer` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Balancer) | leastBytes |
| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" |
| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
| `sasl.authType` | `SCRAM` or `PLAIN` | |
Expand Down
29 changes: 29 additions & 0 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka

import "github.com/segmentio/kafka-go"

type Balancer kafka.Balancer

func GetBalancerCRC32() Balancer {
return &kafka.CRC32Balancer{}
}

func GetBalancerHash() Balancer {
return &kafka.Hash{}
}

func GetBalancerLeastBytes() Balancer {
return &kafka.LeastBytes{}
}

func GetBalancerMurmur2Balancer() Balancer {
return &kafka.Murmur2Balancer{}
}

func GetBalancerReferenceHash() Balancer {
return &kafka.ReferenceHash{}
}

func GetBalancerRoundRobin() Balancer {
return &kafka.RoundRobin{}
}
66 changes: 66 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kafka

import (
"reflect"
"testing"
)

func TestGetBalancerCRC32(t *testing.T) {
balancer := GetBalancerCRC32()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.CRC32Balancer" {
t.Errorf("Expected *kafka.CRC32Balancer, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerHash(t *testing.T) {
balancer := GetBalancerHash()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.Hash" {
t.Errorf("Expected *kafka.Hash, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerLeastBytes(t *testing.T) {
balancer := GetBalancerLeastBytes()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.LeastBytes" {
t.Errorf("Expected *kafka.LeastBytes, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerMurmur2Balancer(t *testing.T) {
balancer := GetBalancerMurmur2Balancer()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.Murmur2Balancer" {
t.Errorf("Expected *kafka.Murmur2Balancer, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerReferenceHash(t *testing.T) {
balancer := GetBalancerReferenceHash()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.ReferenceHash" {
t.Errorf("Expected *kafka.ReferenceHash, got %s", reflect.TypeOf(balancer).String())
}
}

func TestGetBalancerRoundRobinh(t *testing.T) {
balancer := GetBalancerRoundRobin()
if balancer == nil {
t.Error("Expected non-nil balancer, got nil")
}
if reflect.TypeOf(balancer).String() != "*kafka.RoundRobin" {
t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String())
}
}
20 changes: 15 additions & 5 deletions batch_consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka

import (
"errors"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -41,9 +42,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
}

if cfg.RetryEnabled {
c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error {
return c.consumeFn([]*Message{toMessage(message)})
})
c.base.setupCronsumer(cfg, c.runKonsumerFn)
}

if cfg.APIEnabled {
Expand All @@ -53,6 +52,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error {
msgList := []*Message{toMessage(message)}

err := b.consumeFn(msgList)
if msgList[0].ErrDescription != "" {
err = errors.New(msgList[0].ErrDescription)
}
return err
}

func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector {
return b.base.GetMetricCollectors()
}
Expand Down Expand Up @@ -176,14 +185,15 @@ func (b *batchConsumer) process(chunkMessages []*Message) {

if consumeErr != nil && b.retryEnabled {
cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages))
errorMessage := consumeErr.Error()
if b.transactionalRetry {
for i := range chunkMessages {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
}
} else {
for i := range chunkMessages {
if chunkMessages[i].IsFailed {
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic))
cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage))
}
}
}
Expand Down
37 changes: 36 additions & 1 deletion batch_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func Test_batchConsumer_startBatch_with_preBatch(t *testing.T) {
messageProcessedStream: make(chan struct{}, 1),
metric: &ConsumerMetric{},
wg: sync.WaitGroup{},
messageGroupDuration: 500 * time.Millisecond,
messageGroupDuration: 20 * time.Second,
r: &mc,
concurrency: 1,
},
Expand Down Expand Up @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) {
}
}

func Test_batchConsumer_runKonsumerFn(t *testing.T) {
t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("default error")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
return expectedError
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})

t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) {
// Given
expectedError := errors.New("message error description")
bc := batchConsumer{consumeFn: func(messages []*Message) error {
messages[0].ErrDescription = "message error description"
return errors.New("default error")
}}

// When
actualError := bc.runKonsumerFn(kcronsumer.Message{})

// Then
if actualError.Error() != expectedError.Error() {
t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error())
}
})
}

func createMessages(partitionStart int, partitionEnd int) []*Message {
messages := make([]*Message, 0)
for i := partitionStart; i < partitionEnd; i++ {
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (c *consumer) process(message *Message) {
}

if consumeErr != nil && c.retryEnabled {
retryableMsg := message.toRetryableMessage(c.retryTopic)
retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error())
if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil {
c.logger.Errorf("Error producing message %s to exception/retry topic %s",
string(retryableMsg.Value), produceErr.Error())
Expand Down
29 changes: 24 additions & 5 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ type Consumer interface {
Consume()

// Pause function pauses consumer, it is stop consuming new messages
// It works idempotent under the hood
// Calling with multiple goroutines is safe
Pause()

// Resume function resumes consumer, it is start to working
// It works idempotent under the hood
// Calling with multiple goroutines is safe
Resume()

// GetMetricCollectors for the purpose of making metric collectors available.
Expand Down Expand Up @@ -67,8 +71,6 @@ type base struct {
incomingMessageStream chan *IncomingMessage
singleConsumingStream chan *Message
batchConsumingStream chan []*Message
brokers []string
topic string
retryTopic string
subprocesses subprocesses
wg sync.WaitGroup
Expand All @@ -80,6 +82,7 @@ type base struct {
distributedTracingEnabled bool
consumerState state
metricPrefix string
mu sync.Mutex
}

func NewConsumer(cfg *ConsumerConfig) (Consumer, error) {
Expand Down Expand Up @@ -118,8 +121,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) {
consumerState: stateRunning,
skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn,
metricPrefix: cfg.MetricPrefix,
brokers: cfg.Reader.Brokers,
topic: cfg.Reader.Topic,
mu: sync.Mutex{},
}

if cfg.DistributedTracingEnabled {
Expand Down Expand Up @@ -177,6 +179,7 @@ func (c *base) startConsume() {
m := &kafka.Message{}
err := c.r.FetchMessage(c.context, m)
if err != nil {
c.logger.Debug("c.r.FetchMessage ", err.Error())
if c.context.Err() != nil {
continue
}
Expand Down Expand Up @@ -207,7 +210,15 @@ func (c *base) startConsume() {
}

func (c *base) Pause() {
c.logger.Info("Consumer is paused!")
c.mu.Lock()
defer c.mu.Unlock()

if c.consumerState == statePaused {
c.logger.Debug("Consumer is already paused mode!")
return
}

c.logger.Infof("Consumer is paused!")

c.cancelFn()

Expand All @@ -217,6 +228,14 @@ func (c *base) Pause() {
}

func (c *base) Resume() {
c.mu.Lock()
defer c.mu.Unlock()

if c.consumerState == stateRunning {
c.logger.Debug("Consumer is already running mode!")
return
}

c.logger.Info("Consumer is resumed!")

c.pause = make(chan struct{})
Expand Down
Loading

0 comments on commit 413227a

Please sign in to comment.