From 175b1dcc596d54ed5e9c8f38901bf6b4f7d229f0 Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Fri, 2 Aug 2024 00:25:51 +0300 Subject: [PATCH 1/2] feat: change skip message log level to debug and add Pretty for writing headers --- consumer_base.go | 12 ++++++------ message.go | 15 ++++++++++++++- message_test.go | 16 ++++++++++++++++ test/integration/docker-compose.yml | 2 +- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/consumer_base.go b/consumer_base.go index 9231a3a..87a3bbd 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -205,19 +205,19 @@ func (c *base) startConsume() { continue } + incomingMessage := &IncomingMessage{ + kafkaMessage: m, + message: fromKafkaMessage(m), + } + if c.skipMessageByHeaderFn != nil && c.skipMessageByHeaderFn(m.Headers) { - c.logger.Infof("Message is not processed. Header filter applied. Headers: %v", m.Headers) + c.logger.Debugf("Message is not processed. Header filter applied. Headers: %v", incomingMessage.message.Headers.Pretty()) if err = c.r.CommitMessages([]kafka.Message{*m}); err != nil { c.logger.Errorf("Commit Error %s,", err.Error()) } continue } - incomingMessage := &IncomingMessage{ - kafkaMessage: m, - message: fromKafkaMessage(m), - } - if c.distributedTracingEnabled { incomingMessage.message.Context = c.propagator.Extract(context.Background(), otelkafkakonsumer.NewMessageCarrier(m)) } diff --git a/message.go b/message.go index 2022eba..32e3b55 100644 --- a/message.go +++ b/message.go @@ -2,6 +2,8 @@ package kafka import ( "context" + "fmt" + "strings" "time" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -15,6 +17,17 @@ const ( type Header = protocol.Header +type Headers []Header + +// Pretty Writes every header key and value, it is useful for debugging purpose +func (hs Headers) Pretty() string { + headerStrings := make([]string, len(hs)) + for i := range hs { + headerStrings[i] = fmt.Sprintf("%s: %s", hs[i].Key, string(hs[i].Value)) + } + return strings.Join(headerStrings, ", ") +} + type Message struct { Time time.Time WriterData interface{} @@ -24,7 +37,7 @@ type Message struct { Topic string Key []byte Value []byte - Headers []Header + Headers Headers Partition int Offset int64 HighWaterMark int64 diff --git a/message_test.go b/message_test.go index b9dcd3f..482d4dc 100644 --- a/message_test.go +++ b/message_test.go @@ -266,3 +266,19 @@ func TestMessage_toRetryableMessage(t *testing.T) { } }) } + +func TestHeaders_Pretty(t *testing.T) { + // Given + headers := Headers{ + {Key: "key1", Value: []byte("value1")}, + {Key: "key2", Value: []byte("value2")}, + } + + // When + result := headers.Pretty() + + // Then + if result != "key1: value1, key2: value2" { + t.Error("result must be `key1: value1, key2: value2`") + } +} diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index b62f977..351658a 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda From 6aaff78a0f88fec915ca7e49e0d7f684159f9dee Mon Sep 17 00:00:00 2001 From: Abdulsametileri Date: Fri, 2 Aug 2024 00:27:11 +0300 Subject: [PATCH 2/2] chore: change image file to amd64 version --- test/integration/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 351658a..b62f977 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda