From b210f832f9296c8625d288b810de06cda9a7e881 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Fri, 16 Feb 2024 00:37:32 +0300 Subject: [PATCH 01/13] feat: add header filter feature --- consumer_base.go | 10 ++++ consumer_base_test.go | 39 ++++++++++++++ consumer_config.go | 5 +- examples/with-header-filter-consumer/main.go | 47 ++++++++++++++++ test/integration/go.sum | 3 +- test/integration/integration_test.go | 57 ++++++++++++++++++++ 6 files changed, 157 insertions(+), 4 deletions(-) create mode 100644 examples/with-header-filter-consumer/main.go diff --git a/consumer_base.go b/consumer_base.go index fcb7cf2..35f057b 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -54,6 +54,7 @@ type base struct { context context.Context r Reader cancelFn context.CancelFunc + skipMessageByHeaderFn SkipMessageByHeaderFn metric *ConsumerMetric pause chan struct{} quit chan struct{} @@ -107,6 +108,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { singleConsumingStream: make(chan *Message, cfg.Concurrency), batchConsumingStream: make(chan []*Message, cfg.Concurrency), consumerState: stateRunning, + skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, } if cfg.DistributedTracingEnabled { @@ -159,6 +161,14 @@ func (c *base) startConsume() { continue } + if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) { + c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", (*m).Headers) + if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil { + c.logger.Errorf("Commit Error %s,", err.Error()) + } + continue + } + incomingMessage := &IncomingMessage{ kafkaMessage: m, message: fromKafkaMessage(m), diff --git a/consumer_base_test.go b/consumer_base_test.go index 65f60e0..602006f 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -57,6 +57,45 @@ func Test_base_startConsume(t *testing.T) { t.Error(diff) } }) + + t.Run("Skip_Incoming_Messages_When_SkipMessageByHeaderFn_Is_Applied", func(t *testing.T) { + // Given + mc := mockReader{} + skipMessageCh := make(chan struct{}) + b := base{ + wg: sync.WaitGroup{}, + r: &mc, + logger: NewZapLogger(LogLevelError), + incomingMessageStream: make(chan *IncomingMessage), + skipMessageByHeaderFn: func(header []kafka.Header) bool { + defer func() { + skipMessageCh <- struct{}{} + }() + + for _, h := range header { + if h.Key == "header" { + return true + } + } + return false + }, + } + + b.wg.Add(1) + + // When + go b.startConsume() + + // Then + <-skipMessageCh + + // assert incomingMessageStream does not receive any value because message is skipped + select { + case <-b.incomingMessageStream: + t.Fatal("incoming message stream must equal to 0") + case <-time.After(1 * time.Second): + } + }) } func Test_base_Pause(t *testing.T) { diff --git a/consumer_config.go b/consumer_config.go index d431348..a09436f 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -21,6 +21,8 @@ type PreBatchFn func([]*Message) []*Message type ConsumeFn func(*Message) error +type SkipMessageByHeaderFn func(header []kafka.Header) bool + type DialConfig struct { Timeout time.Duration KeepAlive time.Duration @@ -36,6 +38,7 @@ type ConsumerConfig struct { Dial *DialConfig BatchConfiguration *BatchConfiguration ConsumeFn ConsumeFn + SkipMessageByHeaderFn SkipMessageByHeaderFn TransactionalRetry *bool RetryConfiguration RetryConfiguration LogLevel LogLevel @@ -116,8 +119,6 @@ type DistributedTracingConfiguration struct { Propagator propagation.TextMapPropagator } -type SkipMessageByHeaderFn func(headers []Header) bool - func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header { headers := make([]Header, 0, len(cronsumerHeaders)) for i := range cronsumerHeaders { diff --git a/examples/with-header-filter-consumer/main.go b/examples/with-header-filter-consumer/main.go new file mode 100644 index 0000000..0c699cb --- /dev/null +++ b/examples/with-header-filter-consumer/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "os" + "os/signal" +) + +func main() { + consumerCfg := &kafka.ConsumerConfig{ + Concurrency: 1, + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + RetryEnabled: false, + SkipMessageByHeaderFn: skipMessageByHeaderFn, + ConsumeFn: consumeFn, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + fmt.Println("Consumer started...!") + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c +} + +func skipMessageByHeaderFn(headers []kafka.Header) bool { + for _, header := range headers { + if header.Key == "SkipMessage" { + return true + } + } + return false +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value)) + return nil +} diff --git a/test/integration/go.sum b/test/integration/go.sum index b75f3d8..5eb6b0d 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,4 @@ -github.com/Trendyol/kafka-cronsumer v1.4.6 h1:Hc6afln69+cCAyaYJSQRnjzH5gZ9dpNa/PsBeXiL5GY= -github.com/Trendyol/kafka-cronsumer v1.4.6/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index f4b4295..a7733f1 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -436,6 +436,63 @@ func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) { } } +func Test_Should_Skip_Message_When_Header_Filter_Given(t *testing.T) { + // Given + topic := "header-filter-topic" + consumerGroup := "header-filter-cg" + brokerAddress := "localhost:9092" + + incomingMessage := []segmentio.Message{ + { + Topic: topic, + Headers: []segmentio.Header{ + {Key: "SkipMessage", Value: []byte("any")}, + }, + Key: []byte("1"), + Value: []byte(`foo`), + }, + } + + _, cleanUp := createTopicAndWriteMessages(t, topic, incomingMessage) + defer cleanUp() + + consumeCh := make(chan struct{}) + skipMessageCh := make(chan struct{}) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + SkipMessageByHeaderFn: func(header []kafka.Header) bool { + defer func() { + skipMessageCh <- struct{}{} + }() + for _, h := range header { + if h.Key == "SkipMessage" { + return true + } + } + return false + }, + ConsumeFn: func(message *kafka.Message) error { + consumeCh <- struct{}{} + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + <-skipMessageCh + + select { + case <-consumeCh: + t.Fatal("Message must be skipped! consumeCh mustn't receive any value") + case <-time.After(1 * time.Second): + } +} + func createTopicAndWriteMessages(t *testing.T, topicName string, messages []segmentio.Message) (*segmentio.Conn, func()) { t.Helper() From 86ad20cb753aba4676fb8252fdbb90afa5dbba35 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Fri, 16 Feb 2024 00:42:08 +0300 Subject: [PATCH 02/13] chore: add skipMessageByHeaderFn to the README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 2ddba47..9d67c10 100644 --- a/README.md +++ b/README.md @@ -223,6 +223,7 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap |--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------| | `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | | | `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | | +| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | | `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | | `concurrency` | Number of goroutines used at listeners | 1 | | `retryEnabled` | Retry/Exception consumer is working or not | false | From 8c8d4b97d3a1b90ede142f11a3bb58803215f4b8 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Fri, 16 Feb 2024 00:45:16 +0300 Subject: [PATCH 03/13] chore: fix lint --- consumer_base.go | 2 +- consumer_base_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer_base.go b/consumer_base.go index 35f057b..69b50d1 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -162,7 +162,7 @@ func (c *base) startConsume() { } if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) { - c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", (*m).Headers) + c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", m.Headers) if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil { c.logger.Errorf("Commit Error %s,", err.Error()) } diff --git a/consumer_base_test.go b/consumer_base_test.go index 602006f..2bd07c9 100644 --- a/consumer_base_test.go +++ b/consumer_base_test.go @@ -65,7 +65,7 @@ func Test_base_startConsume(t *testing.T) { b := base{ wg: sync.WaitGroup{}, r: &mc, - logger: NewZapLogger(LogLevelError), + logger: NewZapLogger(LogLevelDebug), incomingMessageStream: make(chan *IncomingMessage), skipMessageByHeaderFn: func(header []kafka.Header) bool { defer func() { From a43b245ae9a52ea8dae917344c47de5b4bf8c739 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Wed, 27 Mar 2024 22:35:53 +0300 Subject: [PATCH 04/13] fix: x-error-message bug when transactional retry disable --- .../main.go | 19 ++- message.go | 13 ++ message_test.go | 121 ++++++++++++++++++ 3 files changed, 150 insertions(+), 3 deletions(-) diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index ecfcf96..5c05a22 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" @@ -10,6 +11,17 @@ import ( ) func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + DistributedTracingConfiguration: kafka.DistributedTracingConfiguration{}, + Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, + }) + + producer.ProduceBatch(context.Background(), []kafka.Message{ + {Value: []byte("message1")}, + {Value: []byte("message2")}, + {Value: []byte("message3")}, + }) + consumerCfg := &kafka.ConsumerConfig{ Reader: kafka.ReaderConfig{ Brokers: []string{"localhost:29092"}, @@ -25,8 +37,8 @@ func main() { RetryConfiguration: kafka.RetryConfiguration{ Brokers: []string{"localhost:29092"}, Topic: "retry-topic", - StartTimeCron: "*/5 * * * *", - WorkDuration: 4 * time.Minute, + StartTimeCron: "*/1 * * * *", + WorkDuration: 20 * time.Second, MaxRetry: 3, }, MessageGroupDuration: time.Second, @@ -48,8 +60,9 @@ func main() { func batchConsumeFn(messages []*kafka.Message) error { // you can add custom error handling here & flag messages for i := range messages { - if i%2 == 0 { + if i < 2 { messages[i].IsFailed = true + messages[i].ErrDescription = fmt.Sprintf("%d error", i+1) } } diff --git a/message.go b/message.go index a7281a8..f9cb45b 100644 --- a/message.go +++ b/message.go @@ -9,6 +9,10 @@ import ( "github.com/segmentio/kafka-go/protocol" ) +const ( + errMessageKey = "x-error-message" +) + type Header = protocol.Header type Message struct { @@ -27,6 +31,8 @@ type Message struct { // IsFailed Is only used on transactional retry disabled IsFailed bool + // ErrDescription specifies IsFailed message error + ErrDescription string } type IncomingMessage struct { @@ -72,6 +78,13 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { }) } + if m.ErrDescription != "" { + headers = append(headers, kcronsumer.Header{ + Key: errMessageKey, + Value: []byte(m.ErrDescription), + }) + } + return kcronsumer.NewMessageBuilder(). WithKey(m.Key). WithValue(m.Value). diff --git a/message_test.go b/message_test.go index ee10e58..bc2d57d 100644 --- a/message_test.go +++ b/message_test.go @@ -2,6 +2,7 @@ package kafka import ( "bytes" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "testing" "github.com/segmentio/kafka-go" @@ -113,3 +114,123 @@ func TestMessage_RemoveHeader(t *testing.T) { t.Fatalf("Header length must be equal to 0") } } + +func TestMessage_toRetryableMessage(t *testing.T) { + t.Run("When_error_description_exist", func(t *testing.T) { + // Given + message := Message{ + Key: []byte("key"), + Value: []byte("value"), + Headers: []Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + }, + ErrDescription: "some error description", + } + expected := kcronsumer.Message{ + Topic: "retry-topic", + Key: []byte("key"), + Value: []byte("value"), + Headers: []kcronsumer.Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + { + Key: "x-error-message", + Value: []byte("some error description"), + }, + }, + } + + // When + actual := message.toRetryableMessage("retry-topic") + + // Then + if actual.Topic != expected.Topic { + t.Errorf("topic must be %q", expected.Topic) + } + + if !bytes.Equal(actual.Key, expected.Key) { + t.Errorf("Key must be equal to %q", string(expected.Key)) + } + + if !bytes.Equal(actual.Value, expected.Value) { + t.Errorf("Value must be equal to %q", string(expected.Value)) + } + + if len(actual.Headers) != 2 { + t.Error("Header length must be equal to 2") + } + + if actual.Headers[0].Key != expected.Headers[0].Key { + t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key) + } + + if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) { + t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value) + } + + if actual.Headers[1].Key != expected.Headers[1].Key { + t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key) + } + + if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) { + t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value) + } + }) + t.Run("When_error_description_does_not_exist", func(t *testing.T) { + // Given + message := Message{ + Key: []byte("key"), + Value: []byte("value"), + Headers: []Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + }, + } + expected := kcronsumer.Message{ + Topic: "retry-topic", + Key: []byte("key"), + Value: []byte("value"), + Headers: []kcronsumer.Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + }, + } + + // When + actual := message.toRetryableMessage("retry-topic") + + // Then + if actual.Topic != expected.Topic { + t.Errorf("topic must be %q", expected.Topic) + } + + if !bytes.Equal(actual.Key, expected.Key) { + t.Errorf("Key must be equal to %q", string(expected.Key)) + } + + if !bytes.Equal(actual.Value, expected.Value) { + t.Errorf("Value must be equal to %q", string(expected.Value)) + } + + if len(actual.Headers) != 1 { + t.Error("Header length must be equal to 1") + } + + if actual.Headers[0].Key != expected.Headers[0].Key { + t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key) + } + + if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) { + t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value) + } + }) +} From 48986ac2005858c5fb5817e585e4a5f7ac740f4c Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 08:40:45 +0300 Subject: [PATCH 05/13] feat: add default x-error-message when errdescription does not exist --- batch_consumer.go | 6 ++++-- consumer.go | 2 +- message.go | 9 +++++++-- message_test.go | 20 ++++++++++++++++---- 4 files changed, 28 insertions(+), 9 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 60de663..4fb6e23 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -176,14 +176,16 @@ func (b *batchConsumer) process(chunkMessages []*Message) { if consumeErr != nil && b.retryEnabled { cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages)) + errorMessage := consumeErr.Error() if b.transactionalRetry { for i := range chunkMessages { - cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) + chunkMessages[i].ErrDescription = consumeErr.Error() + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage)) } } else { for i := range chunkMessages { if chunkMessages[i].IsFailed { - cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage)) } } } diff --git a/consumer.go b/consumer.go index 66f5e6b..daab85b 100644 --- a/consumer.go +++ b/consumer.go @@ -146,7 +146,7 @@ func (c *consumer) process(message *Message) { } if consumeErr != nil && c.retryEnabled { - retryableMsg := message.toRetryableMessage(c.retryTopic) + retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error()) if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil { c.logger.Errorf("Error producing message %s to exception/retry topic %s", string(retryableMsg.Value), produceErr.Error()) diff --git a/message.go b/message.go index f9cb45b..120cb73 100644 --- a/message.go +++ b/message.go @@ -69,7 +69,7 @@ func fromKafkaMessage(kafkaMessage *kafka.Message) *Message { return message } -func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { +func (m *Message) toRetryableMessage(retryTopic, consumeError string) kcronsumer.Message { headers := make([]kcronsumer.Header, 0, len(m.Headers)) for i := range m.Headers { headers = append(headers, kcronsumer.Header{ @@ -78,7 +78,12 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { }) } - if m.ErrDescription != "" { + if m.ErrDescription == "" { + headers = append(headers, kcronsumer.Header{ + Key: errMessageKey, + Value: []byte(consumeError), + }) + } else { headers = append(headers, kcronsumer.Header{ Key: errMessageKey, Value: []byte(m.ErrDescription), diff --git a/message_test.go b/message_test.go index bc2d57d..3a3fbd9 100644 --- a/message_test.go +++ b/message_test.go @@ -146,7 +146,7 @@ func TestMessage_toRetryableMessage(t *testing.T) { } // When - actual := message.toRetryableMessage("retry-topic") + actual := message.toRetryableMessage("retry-topic", "consumeFn error") // Then if actual.Topic != expected.Topic { @@ -202,11 +202,15 @@ func TestMessage_toRetryableMessage(t *testing.T) { Key: "x-custom-client-header", Value: []byte("bar"), }, + { + Key: "x-error-message", + Value: []byte("consumeFn error"), + }, }, } // When - actual := message.toRetryableMessage("retry-topic") + actual := message.toRetryableMessage("retry-topic", "consumeFn error") // Then if actual.Topic != expected.Topic { @@ -221,8 +225,8 @@ func TestMessage_toRetryableMessage(t *testing.T) { t.Errorf("Value must be equal to %q", string(expected.Value)) } - if len(actual.Headers) != 1 { - t.Error("Header length must be equal to 1") + if len(actual.Headers) != 2 { + t.Error("Header length must be equal to 2") } if actual.Headers[0].Key != expected.Headers[0].Key { @@ -232,5 +236,13 @@ func TestMessage_toRetryableMessage(t *testing.T) { if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) { t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value) } + + if actual.Headers[1].Key != expected.Headers[1].Key { + t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key) + } + + if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) { + t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value) + } }) } From bc8ff85c4d5056cb89f7887829201e5e4de73743 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 08:42:39 +0300 Subject: [PATCH 06/13] chore: some fixes --- batch_consumer.go | 1 - examples/with-kafka-transactional-retry-disabled/main.go | 3 +-- message.go | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 4fb6e23..ba2825c 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -179,7 +179,6 @@ func (b *batchConsumer) process(chunkMessages []*Message) { errorMessage := consumeErr.Error() if b.transactionalRetry { for i := range chunkMessages { - chunkMessages[i].ErrDescription = consumeErr.Error() cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage)) } } else { diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index 5c05a22..d3bee3c 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -12,8 +12,7 @@ import ( func main() { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - DistributedTracingConfiguration: kafka.DistributedTracingConfiguration{}, - Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, + Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, }) producer.ProduceBatch(context.Background(), []kafka.Message{ diff --git a/message.go b/message.go index 120cb73..28e5072 100644 --- a/message.go +++ b/message.go @@ -31,7 +31,7 @@ type Message struct { // IsFailed Is only used on transactional retry disabled IsFailed bool - // ErrDescription specifies IsFailed message error + // ErrDescription specifies the IsFailed message's error ErrDescription string } From 4fb19bc8ace714a1fe233182e34735cec329ca26 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 08:46:34 +0300 Subject: [PATCH 07/13] chore: add more description --- message.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/message.go b/message.go index 28e5072..58cb114 100644 --- a/message.go +++ b/message.go @@ -31,7 +31,11 @@ type Message struct { // IsFailed Is only used on transactional retry disabled IsFailed bool + // ErrDescription specifies the IsFailed message's error + + // If available, kafka-konsumer writes this description into the failed message's + // headers as `x-error-message` key when producing retry topic ErrDescription string } From 6be6336b08cd58382e882b030c2dbb6253eb4fda Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 08:56:01 +0300 Subject: [PATCH 08/13] chore: fix lint --- message_test.go | 3 ++- test/integration/go.mod | 10 +++++----- test/integration/go.sum | 23 +++++++++++------------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/message_test.go b/message_test.go index 3a3fbd9..03e5012 100644 --- a/message_test.go +++ b/message_test.go @@ -2,9 +2,10 @@ package kafka import ( "bytes" - kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" "testing" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/segmentio/kafka-go" ) diff --git a/test/integration/go.mod b/test/integration/go.mod index 1e0444d..3c5f5e6 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -19,12 +19,12 @@ require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect - github.com/gofiber/fiber/v2 v2.50.0 // indirect + github.com/gofiber/fiber/v2 v2.52.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.1 // indirect + github.com/google/uuid v1.5.0 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect @@ -35,7 +35,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.50.0 // indirect + github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -46,7 +46,7 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 5e87da8..78f67c5 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,6 +1,4 @@ -github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= -github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= @@ -23,24 +21,24 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= -github.com/gofiber/fiber/v2 v2.50.0 h1:ia0JaB+uw3GpNSCR5nvC5dsaxXjRU5OEu36aytx+zGw= -github.com/gofiber/fiber/v2 v2.50.0/go.mod h1:21eytvay9Is7S6z+OgPi7c7n4++tnClWmhpimVHMimw= +github.com/gofiber/fiber/v2 v2.52.1 h1:1RoU2NS+b98o1L77sdl5mboGPiW+0Ypsi5oLmcYlgHI= +github.com/gofiber/fiber/v2 v2.52.1/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -73,8 +71,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= -github.com/valyala/fasthttp v1.50.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -122,8 +120,9 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= From 38a5c2c05347727be03559162079bff9a593c14f Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 09:18:01 +0300 Subject: [PATCH 09/13] chore: update integration test image and some fixes --- test/integration/docker-compose.yml | 2 +- test/integration/integration_test.go | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 846a87d..476b6b8 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.2.4 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 container_name: redpanda-1 command: - redpanda diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index a7733f1..e4d47fb 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -247,9 +247,9 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di MaxRetry: 3, LogLevel: "error", }, - MessageGroupDuration: 5 * time.Second, + MessageGroupDuration: 10 * time.Second, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, + MessageGroupLimit: 5, BatchConsumeFn: func(messages []*kafka.Message) error { messages[1].IsFailed = true return errors.New("err") @@ -369,18 +369,23 @@ func Test_Should_Propagate_Custom_Headers_With_Kafka_Cronsumer_Successfully(t *t assertEventually(t, conditionFunc, 45*time.Second, time.Second) msg, err := retryConn.ReadMessage(10_000) if err != nil { - t.Fatalf("error reading message") + t.Fatal("error reading message") } - if len(msg.Headers) != 1 { - t.Fatalf("msg header must be length of 1") + if len(msg.Headers) != 2 { + t.Fatal("msg header must be length of 2") } if msg.Headers[0].Key != "custom_exception_header" { - t.Fatalf("key must be custom_exception_header") + t.Fatal("key must be custom_exception_header") } if !bytes.Equal(msg.Headers[0].Value, []byte("custom_exception_value")) { - t.Fatalf("value must be custom_exception_value") + t.Fatal("value must be custom_exception_value") + } + if msg.Headers[1].Key != "x-error-message" { + t.Fatal("key must be x-error-message") + } + if !bytes.Equal(msg.Headers[1].Value, []byte("err occurred")) { + t.Fatal("err occurred") } - _ = msg } func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) { From d0653f1837409bb9e232031c9253b14bcf772c82 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 09:22:32 +0300 Subject: [PATCH 10/13] chore: integration test docker-image amd64 --- test/integration/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 476b6b8..b62f977 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda From b86c0af230d571fdc0821021bf0371b627f06a17 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Thu, 28 Mar 2024 09:27:31 +0300 Subject: [PATCH 11/13] chore: fix integration test --- test/integration/integration_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index e4d47fb..fc3751e 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -242,12 +242,11 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di RetryConfiguration: kafka.RetryConfiguration{ Brokers: []string{brokerAddress}, Topic: retryTopic, - StartTimeCron: "*/1 * * * *", - WorkDuration: 50 * time.Second, + StartTimeCron: "*/5 * * * *", + WorkDuration: 4 * time.Minute, MaxRetry: 3, - LogLevel: "error", }, - MessageGroupDuration: 10 * time.Second, + MessageGroupDuration: 20 * time.Second, BatchConfiguration: &kafka.BatchConfiguration{ MessageGroupLimit: 5, BatchConsumeFn: func(messages []*kafka.Message) error { From 9235292ad39dfd19f37d7e37bc730c5915e97c05 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Thu, 28 Mar 2024 21:47:03 +0300 Subject: [PATCH 12/13] feat: cronsumer iteration imp related x-error-message --- batch_consumer.go | 13 ++++++- batch_consumer_test.go | 35 +++++++++++++++++++ .../main.go | 19 ++++++---- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index ba2825c..98031ef 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -42,7 +43,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { if cfg.RetryEnabled { c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.consumeFn([]*Message{toMessage(message)}) + return c.runKonsumerFn(message) }) } @@ -53,6 +54,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } +func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error { + msgList := []*Message{toMessage(message)} + + err := b.consumeFn(msgList) + if msgList[0].ErrDescription != "" { + err = errors.New(msgList[0].ErrDescription) + } + return err +} + func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector { return b.base.GetMetricCollectors() } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 1b390ec..d36643c 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) { } } +func Test_batchConsumer_runKonsumerFn(t *testing.T) { + t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("default error") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + return expectedError + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) + + t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("message error description") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + messages[0].ErrDescription = "message error description" + return errors.New("default error") + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) +} + func createMessages(partitionStart int, partitionEnd int) []*Message { messages := make([]*Message, 0) for i := partitionStart; i < partitionEnd; i++ { diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index d3bee3c..09f0429 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -16,9 +16,9 @@ func main() { }) producer.ProduceBatch(context.Background(), []kafka.Message{ - {Value: []byte("message1")}, - {Value: []byte("message2")}, - {Value: []byte("message3")}, + {Key: []byte("key1"), Value: []byte("message1")}, + {Key: []byte("key2"), Value: []byte("message2")}, + {Key: []byte("key3"), Value: []byte("message3")}, }) consumerCfg := &kafka.ConsumerConfig{ @@ -40,7 +40,7 @@ func main() { WorkDuration: 20 * time.Second, MaxRetry: 3, }, - MessageGroupDuration: time.Second, + MessageGroupDuration: 5 * time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) @@ -54,14 +54,19 @@ func main() { <-c } -// In order to load topic with data, use: -// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt func batchConsumeFn(messages []*kafka.Message) error { // you can add custom error handling here & flag messages for i := range messages { if i < 2 { messages[i].IsFailed = true - messages[i].ErrDescription = fmt.Sprintf("%d error", i+1) + + var retryCount string + retryCountHeader := messages[i].Header("x-retry-count") + if retryCountHeader != nil { + retryCount = string(retryCountHeader.Value) + } + + messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount) } } From 532d1a448d1a2532e2c56c985cae30c264044d15 Mon Sep 17 00:00:00 2001 From: dilaragorum Date: Thu, 28 Mar 2024 21:49:51 +0300 Subject: [PATCH 13/13] chore: fix lint --- batch_consumer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/batch_consumer.go b/batch_consumer.go index 98031ef..647d065 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -42,9 +42,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { } if cfg.RetryEnabled { - c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.runKonsumerFn(message) - }) + c.base.setupCronsumer(cfg, c.runKonsumerFn) } if cfg.APIEnabled {