diff --git a/batch_consumer.go b/batch_consumer.go index 60de663..647d065 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "errors" "time" "github.com/prometheus/client_golang/prometheus" @@ -41,9 +42,7 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { } if cfg.RetryEnabled { - c.base.setupCronsumer(cfg, func(message kcronsumer.Message) error { - return c.consumeFn([]*Message{toMessage(message)}) - }) + c.base.setupCronsumer(cfg, c.runKonsumerFn) } if cfg.APIEnabled { @@ -53,6 +52,16 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } +func (b *batchConsumer) runKonsumerFn(message kcronsumer.Message) error { + msgList := []*Message{toMessage(message)} + + err := b.consumeFn(msgList) + if msgList[0].ErrDescription != "" { + err = errors.New(msgList[0].ErrDescription) + } + return err +} + func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector { return b.base.GetMetricCollectors() } @@ -176,14 +185,15 @@ func (b *batchConsumer) process(chunkMessages []*Message) { if consumeErr != nil && b.retryEnabled { cronsumerMessages := make([]kcronsumer.Message, 0, len(chunkMessages)) + errorMessage := consumeErr.Error() if b.transactionalRetry { for i := range chunkMessages { - cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage)) } } else { for i := range chunkMessages { if chunkMessages[i].IsFailed { - cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic)) + cronsumerMessages = append(cronsumerMessages, chunkMessages[i].toRetryableMessage(b.retryTopic, errorMessage)) } } } diff --git a/batch_consumer_test.go b/batch_consumer_test.go index 1b390ec..d36643c 100644 --- a/batch_consumer_test.go +++ b/batch_consumer_test.go @@ -404,6 +404,41 @@ func Test_batchConsumer_Resume(t *testing.T) { } } +func Test_batchConsumer_runKonsumerFn(t *testing.T) { + t.Run("Should_Return_Default_Error_When_Error_Description_Does_Not_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("default error") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + return expectedError + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) + + t.Run("Should_Return_Message_Error_Description_When_Error_Description_Exist", func(t *testing.T) { + // Given + expectedError := errors.New("message error description") + bc := batchConsumer{consumeFn: func(messages []*Message) error { + messages[0].ErrDescription = "message error description" + return errors.New("default error") + }} + + // When + actualError := bc.runKonsumerFn(kcronsumer.Message{}) + + // Then + if actualError.Error() != expectedError.Error() { + t.Fatalf("actual error = %s should be equal to expected error = %s", actualError.Error(), expectedError.Error()) + } + }) +} + func createMessages(partitionStart int, partitionEnd int) []*Message { messages := make([]*Message, 0) for i := partitionStart; i < partitionEnd; i++ { diff --git a/consumer.go b/consumer.go index 66f5e6b..daab85b 100644 --- a/consumer.go +++ b/consumer.go @@ -146,7 +146,7 @@ func (c *consumer) process(message *Message) { } if consumeErr != nil && c.retryEnabled { - retryableMsg := message.toRetryableMessage(c.retryTopic) + retryableMsg := message.toRetryableMessage(c.retryTopic, consumeErr.Error()) if produceErr := c.cronsumer.Produce(retryableMsg); produceErr != nil { c.logger.Errorf("Error producing message %s to exception/retry topic %s", string(retryableMsg.Value), produceErr.Error()) diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index ecfcf96..09f0429 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" @@ -10,6 +11,16 @@ import ( ) func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, + }) + + producer.ProduceBatch(context.Background(), []kafka.Message{ + {Key: []byte("key1"), Value: []byte("message1")}, + {Key: []byte("key2"), Value: []byte("message2")}, + {Key: []byte("key3"), Value: []byte("message3")}, + }) + consumerCfg := &kafka.ConsumerConfig{ Reader: kafka.ReaderConfig{ Brokers: []string{"localhost:29092"}, @@ -25,11 +36,11 @@ func main() { RetryConfiguration: kafka.RetryConfiguration{ Brokers: []string{"localhost:29092"}, Topic: "retry-topic", - StartTimeCron: "*/5 * * * *", - WorkDuration: 4 * time.Minute, + StartTimeCron: "*/1 * * * *", + WorkDuration: 20 * time.Second, MaxRetry: 3, }, - MessageGroupDuration: time.Second, + MessageGroupDuration: 5 * time.Second, } consumer, _ := kafka.NewConsumer(consumerCfg) @@ -43,13 +54,19 @@ func main() { <-c } -// In order to load topic with data, use: -// kafka-console-producer --broker-list localhost:29092 --topic standart-topic < examples/load.txt func batchConsumeFn(messages []*kafka.Message) error { // you can add custom error handling here & flag messages for i := range messages { - if i%2 == 0 { + if i < 2 { messages[i].IsFailed = true + + var retryCount string + retryCountHeader := messages[i].Header("x-retry-count") + if retryCountHeader != nil { + retryCount = string(retryCountHeader.Value) + } + + messages[i].ErrDescription = fmt.Sprintf("Key = %s error, retry count %s", string(messages[i].Key), retryCount) } } diff --git a/message.go b/message.go index a7281a8..58cb114 100644 --- a/message.go +++ b/message.go @@ -9,6 +9,10 @@ import ( "github.com/segmentio/kafka-go/protocol" ) +const ( + errMessageKey = "x-error-message" +) + type Header = protocol.Header type Message struct { @@ -27,6 +31,12 @@ type Message struct { // IsFailed Is only used on transactional retry disabled IsFailed bool + + // ErrDescription specifies the IsFailed message's error + + // If available, kafka-konsumer writes this description into the failed message's + // headers as `x-error-message` key when producing retry topic + ErrDescription string } type IncomingMessage struct { @@ -63,7 +73,7 @@ func fromKafkaMessage(kafkaMessage *kafka.Message) *Message { return message } -func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { +func (m *Message) toRetryableMessage(retryTopic, consumeError string) kcronsumer.Message { headers := make([]kcronsumer.Header, 0, len(m.Headers)) for i := range m.Headers { headers = append(headers, kcronsumer.Header{ @@ -72,6 +82,18 @@ func (m *Message) toRetryableMessage(retryTopic string) kcronsumer.Message { }) } + if m.ErrDescription == "" { + headers = append(headers, kcronsumer.Header{ + Key: errMessageKey, + Value: []byte(consumeError), + }) + } else { + headers = append(headers, kcronsumer.Header{ + Key: errMessageKey, + Value: []byte(m.ErrDescription), + }) + } + return kcronsumer.NewMessageBuilder(). WithKey(m.Key). WithValue(m.Value). diff --git a/message_test.go b/message_test.go index ee10e58..03e5012 100644 --- a/message_test.go +++ b/message_test.go @@ -4,6 +4,8 @@ import ( "bytes" "testing" + kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" + "github.com/segmentio/kafka-go" ) @@ -113,3 +115,135 @@ func TestMessage_RemoveHeader(t *testing.T) { t.Fatalf("Header length must be equal to 0") } } + +func TestMessage_toRetryableMessage(t *testing.T) { + t.Run("When_error_description_exist", func(t *testing.T) { + // Given + message := Message{ + Key: []byte("key"), + Value: []byte("value"), + Headers: []Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + }, + ErrDescription: "some error description", + } + expected := kcronsumer.Message{ + Topic: "retry-topic", + Key: []byte("key"), + Value: []byte("value"), + Headers: []kcronsumer.Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + { + Key: "x-error-message", + Value: []byte("some error description"), + }, + }, + } + + // When + actual := message.toRetryableMessage("retry-topic", "consumeFn error") + + // Then + if actual.Topic != expected.Topic { + t.Errorf("topic must be %q", expected.Topic) + } + + if !bytes.Equal(actual.Key, expected.Key) { + t.Errorf("Key must be equal to %q", string(expected.Key)) + } + + if !bytes.Equal(actual.Value, expected.Value) { + t.Errorf("Value must be equal to %q", string(expected.Value)) + } + + if len(actual.Headers) != 2 { + t.Error("Header length must be equal to 2") + } + + if actual.Headers[0].Key != expected.Headers[0].Key { + t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key) + } + + if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) { + t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value) + } + + if actual.Headers[1].Key != expected.Headers[1].Key { + t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key) + } + + if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) { + t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value) + } + }) + t.Run("When_error_description_does_not_exist", func(t *testing.T) { + // Given + message := Message{ + Key: []byte("key"), + Value: []byte("value"), + Headers: []Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + }, + } + expected := kcronsumer.Message{ + Topic: "retry-topic", + Key: []byte("key"), + Value: []byte("value"), + Headers: []kcronsumer.Header{ + { + Key: "x-custom-client-header", + Value: []byte("bar"), + }, + { + Key: "x-error-message", + Value: []byte("consumeFn error"), + }, + }, + } + + // When + actual := message.toRetryableMessage("retry-topic", "consumeFn error") + + // Then + if actual.Topic != expected.Topic { + t.Errorf("topic must be %q", expected.Topic) + } + + if !bytes.Equal(actual.Key, expected.Key) { + t.Errorf("Key must be equal to %q", string(expected.Key)) + } + + if !bytes.Equal(actual.Value, expected.Value) { + t.Errorf("Value must be equal to %q", string(expected.Value)) + } + + if len(actual.Headers) != 2 { + t.Error("Header length must be equal to 2") + } + + if actual.Headers[0].Key != expected.Headers[0].Key { + t.Errorf("First Header key must be equal to %q", expected.Headers[0].Key) + } + + if !bytes.Equal(actual.Headers[0].Value, expected.Headers[0].Value) { + t.Errorf("First Header value must be equal to %q", expected.Headers[0].Value) + } + + if actual.Headers[1].Key != expected.Headers[1].Key { + t.Errorf("Second Header key must be equal to %q", expected.Headers[1].Key) + } + + if !bytes.Equal(actual.Headers[1].Value, expected.Headers[1].Value) { + t.Errorf("Second Header value must be equal to %q", expected.Headers[1].Value) + } + }) +} diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 846a87d..b62f977 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.2.4 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda diff --git a/test/integration/go.mod b/test/integration/go.mod index 1e0444d..3c5f5e6 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -19,12 +19,12 @@ require ( github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gofiber/adaptor/v2 v2.2.1 // indirect - github.com/gofiber/fiber/v2 v2.50.0 // indirect + github.com/gofiber/fiber/v2 v2.52.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/uuid v1.3.1 // indirect + github.com/google/uuid v1.5.0 // indirect github.com/klauspost/compress v1.17.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.15 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/pierrec/lz4/v4 v4.1.18 // indirect @@ -35,7 +35,7 @@ require ( github.com/rivo/uniseg v0.4.4 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.50.0 // indirect + github.com/valyala/fasthttp v1.51.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect @@ -46,7 +46,7 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/protobuf v1.30.0 // indirect ) diff --git a/test/integration/go.sum b/test/integration/go.sum index 5e87da8..78f67c5 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,6 +1,4 @@ -github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= -github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= -github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= @@ -23,24 +21,24 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/gofiber/adaptor/v2 v2.2.1 h1:givE7iViQWlsTR4Jh7tB4iXzrlKBgiraB/yTdHs9Lv4= github.com/gofiber/adaptor/v2 v2.2.1/go.mod h1:AhR16dEqs25W2FY/l8gSj1b51Azg5dtPDmm+pruNOrc= -github.com/gofiber/fiber/v2 v2.50.0 h1:ia0JaB+uw3GpNSCR5nvC5dsaxXjRU5OEu36aytx+zGw= -github.com/gofiber/fiber/v2 v2.50.0/go.mod h1:21eytvay9Is7S6z+OgPi7c7n4++tnClWmhpimVHMimw= +github.com/gofiber/fiber/v2 v2.52.1 h1:1RoU2NS+b98o1L77sdl5mboGPiW+0Ypsi5oLmcYlgHI= +github.com/gofiber/fiber/v2 v2.52.1/go.mod h1:KEOE+cXMhXG0zHc9d8+E38hoX+ZN7bhOtgeF2oT6jrQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= -github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= -github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= -github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= @@ -73,8 +71,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.50.0 h1:H7fweIlBm0rXLs2q0XbalvJ6r0CUPFWK3/bB4N13e9M= -github.com/valyala/fasthttp v1.50.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= +github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= +github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -122,8 +120,9 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index a7733f1..fc3751e 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -242,14 +242,13 @@ func Test_Should_Batch_Retry_Only_Failed_Messages_When_Transactional_Retry_Is_Di RetryConfiguration: kafka.RetryConfiguration{ Brokers: []string{brokerAddress}, Topic: retryTopic, - StartTimeCron: "*/1 * * * *", - WorkDuration: 50 * time.Second, + StartTimeCron: "*/5 * * * *", + WorkDuration: 4 * time.Minute, MaxRetry: 3, - LogLevel: "error", }, - MessageGroupDuration: 5 * time.Second, + MessageGroupDuration: 20 * time.Second, BatchConfiguration: &kafka.BatchConfiguration{ - MessageGroupLimit: 100, + MessageGroupLimit: 5, BatchConsumeFn: func(messages []*kafka.Message) error { messages[1].IsFailed = true return errors.New("err") @@ -369,18 +368,23 @@ func Test_Should_Propagate_Custom_Headers_With_Kafka_Cronsumer_Successfully(t *t assertEventually(t, conditionFunc, 45*time.Second, time.Second) msg, err := retryConn.ReadMessage(10_000) if err != nil { - t.Fatalf("error reading message") + t.Fatal("error reading message") } - if len(msg.Headers) != 1 { - t.Fatalf("msg header must be length of 1") + if len(msg.Headers) != 2 { + t.Fatal("msg header must be length of 2") } if msg.Headers[0].Key != "custom_exception_header" { - t.Fatalf("key must be custom_exception_header") + t.Fatal("key must be custom_exception_header") } if !bytes.Equal(msg.Headers[0].Value, []byte("custom_exception_value")) { - t.Fatalf("value must be custom_exception_value") + t.Fatal("value must be custom_exception_value") + } + if msg.Headers[1].Key != "x-error-message" { + t.Fatal("key must be x-error-message") + } + if !bytes.Equal(msg.Headers[1].Value, []byte("err occurred")) { + t.Fatal("err occurred") } - _ = msg } func Test_Should_Batch_Consume_With_PreBatch_Enabled(t *testing.T) {