From c45b273a07d70eb53ee92f4ca433865c60941015 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 19 Sep 2024 18:12:03 +0300 Subject: [PATCH 1/7] feat: add producer interceptor interface --- batch_producer_interceptor.go | 14 ++ examples/with-deadletter/main.go | 2 +- examples/with-kafka-producer/main.go | 2 +- .../main.go | 2 +- producer.go | 28 ++-- producer_interceptor.go | 14 ++ producer_test.go | 39 ++++- test/integration/docker-compose.yml | 2 +- test/integration/integration_test.go | 134 ++++++++++++++---- test/stub-data/producer_interceptor_stub.go | 21 +++ 10 files changed, 217 insertions(+), 41 deletions(-) create mode 100644 batch_producer_interceptor.go create mode 100644 producer_interceptor.go create mode 100644 test/stub-data/producer_interceptor_stub.go diff --git a/batch_producer_interceptor.go b/batch_producer_interceptor.go new file mode 100644 index 0000000..e00b6dc --- /dev/null +++ b/batch_producer_interceptor.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "context" +) + +type BatchProducerInterceptorContext struct { + Context context.Context + Message *Message +} + +type BatchProducerInterceptor interface { + OnProduce(ctx context.Context, msg Message) +} diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go index 7c7f1dc..29ed729 100644 --- a/examples/with-deadletter/main.go +++ b/examples/with-deadletter/main.go @@ -21,7 +21,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }) + }, nil) _ = producer.Produce(context.Background(), kafka.Message{ Topic: topicName, diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index f4522f5..67f66b9 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -10,7 +10,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }) + }, nil) const topicName = "standart-topic" diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index 09f0429..d7ba9fa 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -13,7 +13,7 @@ import ( func main() { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, - }) + }, nil) producer.ProduceBatch(context.Background(), []kafka.Message{ {Key: []byte("key1"), Value: []byte("message1")}, diff --git a/producer.go b/producer.go index ee2910d..3a51574 100644 --- a/producer.go +++ b/producer.go @@ -18,10 +18,11 @@ type Writer interface { } type producer struct { - w Writer + w Writer + interceptor *ProducerInterceptor } -func NewProducer(cfg *ProducerConfig) (Producer, error) { +func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { kafkaWriter.Transport = transport } - p := &producer{w: kafkaWriter} + p := &producer{w: kafkaWriter, interceptor: interceptor} if cfg.DistributedTracingEnabled { otelWriter, err := NewOtelProducer(cfg, kafkaWriter) @@ -64,18 +65,27 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) { return p, nil } -func (c *producer) Produce(ctx context.Context, message Message) error { - return c.w.WriteMessages(ctx, message.toKafkaMessage()) +func (p *producer) Produce(ctx context.Context, message Message) error { + if p.interceptor != nil { + (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message}) + } + + return p.w.WriteMessages(ctx, message.toKafkaMessage()) } -func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { +func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { + if p.interceptor != nil { + (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]}) + } + kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) } - return c.w.WriteMessages(ctx, kafkaMessages...) + + return p.w.WriteMessages(ctx, kafkaMessages...) } -func (c *producer) Close() error { - return c.w.Close() +func (p *producer) Close() error { + return p.w.Close() } diff --git a/producer_interceptor.go b/producer_interceptor.go new file mode 100644 index 0000000..b5bc6b9 --- /dev/null +++ b/producer_interceptor.go @@ -0,0 +1,14 @@ +package kafka + +import ( + "context" +) + +type ProducerInterceptorContext struct { + Context context.Context + Message *Message +} + +type ProducerInterceptor interface { + OnProduce(ctx ProducerInterceptorContext) +} diff --git a/producer_test.go b/producer_test.go index d4fd8ac..02cfb87 100644 --- a/producer_test.go +++ b/producer_test.go @@ -2,6 +2,8 @@ package kafka import ( "context" + stubData "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" + "github.com/gofiber/fiber/v2/utils" "testing" "github.com/segmentio/kafka-go" @@ -20,6 +22,27 @@ func Test_producer_Produce_Successfully(t *testing.T) { } } +func Test_producer_Produce_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + msg := Message{Headers: make([]Header, 0)} + msg.Headers = append(msg.Headers, kafka.Header{ + Key: "x-correlation-id", + Value: []byte(utils.UUIDv4()), + }) + interceptor := stubData.NewMockProducerInterceptor() + + p := producer{w: mw, interceptor: &interceptor} + + // When + err := p.Produce(context.Background(), msg) + + // Then + if err != nil { + t.Fatalf("Producing err %s", err.Error()) + } +} + func Test_producer_ProduceBatch_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -33,6 +56,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) { } } +func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { + // Given + mw := &mockWriter{} + interceptor := stubData.NewMockProducerInterceptor() + p := producer{w: mw, interceptor: &interceptor} + + // When + err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) + // Then + if err != nil { + t.Fatalf("Batch Producing err %s", err.Error()) + } +} + func Test_producer_Close_Successfully(t *testing.T) { // Given mw := &mockWriter{} @@ -48,7 +85,7 @@ func Test_producer_Close_Successfully(t *testing.T) { type mockWriter struct{} -func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error { +func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error { return nil } diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index b62f977..351658a 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index af93446..3adea00 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" + stub_data "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" segmentio "github.com/segmentio/kafka-go" "testing" "time" @@ -14,28 +15,92 @@ import ( func Test_Should_Produce_Successfully(t *testing.T) { // Given t.Parallel() - topic := "produce-topic" brokerAddress := "localhost:9092" - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, - Transport: &kafka.TransportConfig{ - MetadataTopics: []string{ - topic, + t.Run("without interceptor", func(t *testing.T) { + //Given + + topic := "produce-topic" + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, }, - }, - }) + }, nil) - // When - err := producer.Produce(context.Background(), kafka.Message{ - Key: []byte("1"), - Value: []byte(`foo`), + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } }) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + t.Run("with interceptor", func(t *testing.T) { + // Given + topic := "produce-interceptor-topic" + consumerGroup := "produce-topic-cg" + interceptor := stub_data.NewMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, + Transport: &kafka.TransportConfig{ + MetadataTopics: []string{ + topic, + }, + }, + }, &interceptor) + + // When + err := producer.Produce(context.Background(), kafka.Message{ + Key: []byte("1"), + Value: []byte(`foo`), + }) + + messageCh := make(chan *kafka.Message) + + consumerCfg := &kafka.ConsumerConfig{ + Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup}, + ConsumeFn: func(message *kafka.Message) error { + messageCh <- message + return nil + }, + } + + consumer, _ := kafka.NewConsumer(consumerCfg) + defer consumer.Stop() + + consumer.Consume() + + // Then + + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + + actual := <-messageCh + if string(actual.Value) != "foo" { + t.Fatalf("Value does not equal %s", actual.Value) + } + if string(actual.Key) != "1" { + t.Fatalf("Key does not equal %s", actual.Key) + } + if len(actual.Headers) != 1 { + t.Fatalf("Header size does not equal %d", len(actual.Headers)) + } + if string(actual.Headers[0].Key) != stub_data.XSourceAppKey { + t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) + } + if string(actual.Headers[0].Value) != stub_data.XSourceAppValue { + t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) + } + }) } func Test_Should_Batch_Produce_Successfully(t *testing.T) { @@ -43,11 +108,6 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { t.Parallel() topic := "batch-produce-topic" brokerAddress := "localhost:9092" - - producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) - - // When msgs := []kafka.Message{ { Key: []byte("1"), @@ -59,13 +119,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }, } - // When - err := producer.ProduceBatch(context.Background(), msgs) + t.Run("without interceptor", func(t *testing.T) { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil) - // Then - if err != nil { - t.Fatalf("Error while producing err %s", err.Error()) - } + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) + + t.Run("with interceptor", func(t *testing.T) { + interceptor := stub_data.NewMockProducerInterceptor() + + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor) + + // When + err := producer.ProduceBatch(context.Background(), msgs) + + // Then + if err != nil { + t.Fatalf("Error while producing err %s", err.Error()) + } + }) } func Test_Should_Consume_Message_Successfully(t *testing.T) { diff --git a/test/stub-data/producer_interceptor_stub.go b/test/stub-data/producer_interceptor_stub.go new file mode 100644 index 0000000..a82ae5e --- /dev/null +++ b/test/stub-data/producer_interceptor_stub.go @@ -0,0 +1,21 @@ +package test + +import "github.com/Trendyol/kafka-konsumer/v2" + +type MockProducerInterceptor struct{} + +const ( + XSourceAppKey = "x-source-app" + XSourceAppValue = "kafka-konsumer" +) + +func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: XSourceAppKey, + Value: []byte(XSourceAppValue), + }) +} + +func NewMockProducerInterceptor() kafka.ProducerInterceptor { + return &MockProducerInterceptor{} +} From 33a0f06e7d45f366b3c80a6f413dce2a58b5f996 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 19 Sep 2024 18:24:32 +0300 Subject: [PATCH 2/7] fix: redpanda docker image --- test/integration/docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/docker-compose.yml b/test/integration/docker-compose.yml index 351658a..b62f977 100644 --- a/test/integration/docker-compose.yml +++ b/test/integration/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.8" services: redpanda: - image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64 + image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64 container_name: redpanda-1 command: - redpanda From ebcb7c080e75625c8f471b96dea41e1eb0135f66 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 19 Sep 2024 18:40:07 +0300 Subject: [PATCH 3/7] fix: producer tests --- batch_producer_interceptor.go | 14 -------------- examples/with-grafana/main.go | 2 +- producer_test.go | 18 +++++++++++++++--- test/integration/integration_test.go | 10 +++++----- .../producer_interceptor_stub.go | 0 5 files changed, 21 insertions(+), 23 deletions(-) delete mode 100644 batch_producer_interceptor.go rename test/{stub-data => stubdata}/producer_interceptor_stub.go (100%) diff --git a/batch_producer_interceptor.go b/batch_producer_interceptor.go deleted file mode 100644 index e00b6dc..0000000 --- a/batch_producer_interceptor.go +++ /dev/null @@ -1,14 +0,0 @@ -package kafka - -import ( - "context" -) - -type BatchProducerInterceptorContext struct { - Context context.Context - Message *Message -} - -type BatchProducerInterceptor interface { - OnProduce(ctx context.Context, msg Message) -} diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index 11ad868..fc82150 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -31,7 +31,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }) + }, nil) defer producer.Close() go func() { diff --git a/producer_test.go b/producer_test.go index 02cfb87..90bd7dd 100644 --- a/producer_test.go +++ b/producer_test.go @@ -2,7 +2,6 @@ package kafka import ( "context" - stubData "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" "github.com/gofiber/fiber/v2/utils" "testing" @@ -30,7 +29,7 @@ func Test_producer_Produce_interceptor_Successfully(t *testing.T) { Key: "x-correlation-id", Value: []byte(utils.UUIDv4()), }) - interceptor := stubData.NewMockProducerInterceptor() + interceptor := newMockProducerInterceptor() p := producer{w: mw, interceptor: &interceptor} @@ -59,7 +58,7 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) { func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { // Given mw := &mockWriter{} - interceptor := stubData.NewMockProducerInterceptor() + interceptor := newMockProducerInterceptor() p := producer{w: mw, interceptor: &interceptor} // When @@ -92,3 +91,16 @@ func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) erro func (m *mockWriter) Close() error { return nil } + +type mockProducerInterceptor struct{} + +func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "test", + Value: []byte("test"), + }) +} + +func newMockProducerInterceptor() ProducerInterceptor { + return &mockProducerInterceptor{} +} diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 3adea00..2d4306f 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -6,7 +6,7 @@ import ( "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" - stub_data "github.com/Trendyol/kafka-konsumer/v2/test/stub-data" + stubdata "github.com/Trendyol/kafka-konsumer/v2/test/stubdata" segmentio "github.com/segmentio/kafka-go" "testing" "time" @@ -46,7 +46,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { // Given topic := "produce-interceptor-topic" consumerGroup := "produce-topic-cg" - interceptor := stub_data.NewMockProducerInterceptor() + interceptor := stubdata.NewMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, @@ -94,10 +94,10 @@ func Test_Should_Produce_Successfully(t *testing.T) { if len(actual.Headers) != 1 { t.Fatalf("Header size does not equal %d", len(actual.Headers)) } - if string(actual.Headers[0].Key) != stub_data.XSourceAppKey { + if string(actual.Headers[0].Key) != stubdata.XSourceAppKey { t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) } - if string(actual.Headers[0].Value) != stub_data.XSourceAppValue { + if string(actual.Headers[0].Value) != stubdata.XSourceAppValue { t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) } }) @@ -133,7 +133,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }) t.Run("with interceptor", func(t *testing.T) { - interceptor := stub_data.NewMockProducerInterceptor() + interceptor := stubdata.NewMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor) diff --git a/test/stub-data/producer_interceptor_stub.go b/test/stubdata/producer_interceptor_stub.go similarity index 100% rename from test/stub-data/producer_interceptor_stub.go rename to test/stubdata/producer_interceptor_stub.go From a7a5daf135fd38be238dd9fa95a307a8a340bbb9 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 19 Sep 2024 18:43:46 +0300 Subject: [PATCH 4/7] fix: linter --- producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/producer_test.go b/producer_test.go index 90bd7dd..e02c34d 100644 --- a/producer_test.go +++ b/producer_test.go @@ -2,9 +2,10 @@ package kafka import ( "context" - "github.com/gofiber/fiber/v2/utils" "testing" + "github.com/gofiber/fiber/v2/utils" + "github.com/segmentio/kafka-go" ) @@ -35,7 +36,6 @@ func Test_producer_Produce_interceptor_Successfully(t *testing.T) { // When err := p.Produce(context.Background(), msg) - // Then if err != nil { t.Fatalf("Producing err %s", err.Error()) From 8a39b9cc84e22b1f9f6bcb5fd0cd9f4ca0099f43 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Wed, 25 Sep 2024 10:35:33 +0300 Subject: [PATCH 5/7] feat: add multi interceptor support --- producer.go | 22 +++++++++----- producer_test.go | 8 ++--- test/integration/integration_test.go | 35 ++++++++++++++++------ test/stubdata/producer_interceptor_stub.go | 21 ------------- 4 files changed, 44 insertions(+), 42 deletions(-) delete mode 100644 test/stubdata/producer_interceptor_stub.go diff --git a/producer.go b/producer.go index 3a51574..640b919 100644 --- a/producer.go +++ b/producer.go @@ -18,11 +18,11 @@ type Writer interface { } type producer struct { - w Writer - interceptor *ProducerInterceptor + w Writer + interceptors []ProducerInterceptor } -func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) { +func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(cfg.Writer.Brokers...), Topic: cfg.Writer.Topic, @@ -52,7 +52,7 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce kafkaWriter.Transport = transport } - p := &producer{w: kafkaWriter, interceptor: interceptor} + p := &producer{w: kafkaWriter, interceptors: interceptors} if cfg.DistributedTracingEnabled { otelWriter, err := NewOtelProducer(cfg, kafkaWriter) @@ -66,8 +66,8 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce } func (p *producer) Produce(ctx context.Context, message Message) error { - if p.interceptor != nil { - (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message}) + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &message) } return p.w.WriteMessages(ctx, message.toKafkaMessage()) @@ -76,8 +76,8 @@ func (p *producer) Produce(ctx context.Context, message Message) error { func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { kafkaMessages := make([]kafka.Message, 0, len(messages)) for i := range messages { - if p.interceptor != nil { - (*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]}) + if len(p.interceptors) > 0 { + p.executeInterceptors(ctx, &messages[i]) } kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage()) @@ -86,6 +86,12 @@ func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error { return p.w.WriteMessages(ctx, kafkaMessages...) } +func (p *producer) executeInterceptors(ctx context.Context, message *Message) { + for _, interceptor := range p.interceptors { + interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message}) + } +} + func (p *producer) Close() error { return p.w.Close() } diff --git a/producer_test.go b/producer_test.go index e02c34d..452b6df 100644 --- a/producer_test.go +++ b/producer_test.go @@ -32,7 +32,7 @@ func Test_producer_Produce_interceptor_Successfully(t *testing.T) { }) interceptor := newMockProducerInterceptor() - p := producer{w: mw, interceptor: &interceptor} + p := producer{w: mw, interceptors: interceptor} // When err := p.Produce(context.Background(), msg) @@ -59,7 +59,7 @@ func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) { // Given mw := &mockWriter{} interceptor := newMockProducerInterceptor() - p := producer{w: mw, interceptor: &interceptor} + p := producer{w: mw, interceptors: interceptor} // When err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}}) @@ -101,6 +101,6 @@ func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) { }) } -func newMockProducerInterceptor() ProducerInterceptor { - return &mockProducerInterceptor{} +func newMockProducerInterceptor() []ProducerInterceptor { + return []ProducerInterceptor{&mockProducerInterceptor{}} } diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index 2d4306f..a11a4d0 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "github.com/Trendyol/kafka-konsumer/v2" - stubdata "github.com/Trendyol/kafka-konsumer/v2/test/stubdata" segmentio "github.com/segmentio/kafka-go" "testing" "time" @@ -28,7 +27,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { topic, }, }, - }, nil) + }) // When err := producer.Produce(context.Background(), kafka.Message{ @@ -46,7 +45,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { // Given topic := "produce-interceptor-topic" consumerGroup := "produce-topic-cg" - interceptor := stubdata.NewMockProducerInterceptor() + interceptor := newMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}, @@ -55,7 +54,7 @@ func Test_Should_Produce_Successfully(t *testing.T) { topic, }, }, - }, &interceptor) + }, interceptor...) // When err := producer.Produce(context.Background(), kafka.Message{ @@ -94,10 +93,10 @@ func Test_Should_Produce_Successfully(t *testing.T) { if len(actual.Headers) != 1 { t.Fatalf("Header size does not equal %d", len(actual.Headers)) } - if string(actual.Headers[0].Key) != stubdata.XSourceAppKey { + if string(actual.Headers[0].Key) != xSourceAppKey { t.Fatalf("Header key does not equal %s", actual.Headers[0].Key) } - if string(actual.Headers[0].Value) != stubdata.XSourceAppValue { + if string(actual.Headers[0].Value) != xSourceAppValue { t.Fatalf("Header value does not equal %s", actual.Headers[0].Value) } }) @@ -121,7 +120,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { t.Run("without interceptor", func(t *testing.T) { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil) + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}) // When err := producer.ProduceBatch(context.Background(), msgs) @@ -133,10 +132,10 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) { }) t.Run("with interceptor", func(t *testing.T) { - interceptor := stubdata.NewMockProducerInterceptor() + interceptors := newMockProducerInterceptor() producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ - Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor) + Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, interceptors...) // When err := producer.ProduceBatch(context.Background(), msgs) @@ -643,3 +642,21 @@ func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration } } } + +type mockProducerInterceptor struct{} + +const ( + xSourceAppKey = "x-source-app" + xSourceAppValue = "kafka-konsumer" +) + +func (i *mockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: xSourceAppKey, + Value: []byte(xSourceAppValue), + }) +} + +func newMockProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&mockProducerInterceptor{}} +} diff --git a/test/stubdata/producer_interceptor_stub.go b/test/stubdata/producer_interceptor_stub.go deleted file mode 100644 index a82ae5e..0000000 --- a/test/stubdata/producer_interceptor_stub.go +++ /dev/null @@ -1,21 +0,0 @@ -package test - -import "github.com/Trendyol/kafka-konsumer/v2" - -type MockProducerInterceptor struct{} - -const ( - XSourceAppKey = "x-source-app" - XSourceAppValue = "kafka-konsumer" -) - -func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { - ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ - Key: XSourceAppKey, - Value: []byte(XSourceAppValue), - }) -} - -func NewMockProducerInterceptor() kafka.ProducerInterceptor { - return &MockProducerInterceptor{} -} From 748e2824d7c64374d03b4e9268642e0e693695c5 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Wed, 25 Sep 2024 10:40:34 +0300 Subject: [PATCH 6/7] fix: refactoring code --- examples/with-deadletter/main.go | 2 +- examples/with-grafana/main.go | 2 +- examples/with-kafka-producer/main.go | 2 +- examples/with-kafka-transactional-retry-disabled/main.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/with-deadletter/main.go b/examples/with-deadletter/main.go index 29ed729..7c7f1dc 100644 --- a/examples/with-deadletter/main.go +++ b/examples/with-deadletter/main.go @@ -21,7 +21,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }, nil) + }) _ = producer.Produce(context.Background(), kafka.Message{ Topic: topicName, diff --git a/examples/with-grafana/main.go b/examples/with-grafana/main.go index fc82150..11ad868 100644 --- a/examples/with-grafana/main.go +++ b/examples/with-grafana/main.go @@ -31,7 +31,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }, nil) + }) defer producer.Close() go func() { diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index 67f66b9..f4522f5 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -10,7 +10,7 @@ func main() { Writer: kafka.WriterConfig{ Brokers: []string{"localhost:29092"}, }, - }, nil) + }) const topicName = "standart-topic" diff --git a/examples/with-kafka-transactional-retry-disabled/main.go b/examples/with-kafka-transactional-retry-disabled/main.go index d7ba9fa..09f0429 100644 --- a/examples/with-kafka-transactional-retry-disabled/main.go +++ b/examples/with-kafka-transactional-retry-disabled/main.go @@ -13,7 +13,7 @@ import ( func main() { producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"}, - }, nil) + }) producer.ProduceBatch(context.Background(), []kafka.Message{ {Key: []byte("key1"), Value: []byte("message1")}, From 88e35827de09f23f1912a3e5f3e6803bc1833517 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 26 Sep 2024 13:24:58 +0300 Subject: [PATCH 7/7] docs: add producer interceptor example --- README.md | 4 ++ .../with-kafka-producer-interceptor/main.go | 38 +++++++++++++++++++ .../producer-interceptor.go | 16 ++++++++ 3 files changed, 58 insertions(+) create mode 100644 examples/with-kafka-producer-interceptor/main.go create mode 100644 examples/with-kafka-producer-interceptor/producer-interceptor.go diff --git a/README.md b/README.md index 1c71c86..f08eab8 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want. +#### With Producer Interceptor + +Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor) + #### With Distributed Tracing Support Please refer to [Tracing Example](examples/with-tracing/README.md) diff --git a/examples/with-kafka-producer-interceptor/main.go b/examples/with-kafka-producer-interceptor/main.go new file mode 100644 index 0000000..9005712 --- /dev/null +++ b/examples/with-kafka-producer-interceptor/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" +) + +func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }, newProducerInterceptor()...) + + const topicName = "standart-topic" + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) + + fmt.Println("Messages sended...!") +} diff --git a/examples/with-kafka-producer-interceptor/producer-interceptor.go b/examples/with-kafka-producer-interceptor/producer-interceptor.go new file mode 100644 index 0000000..c8a9d1c --- /dev/null +++ b/examples/with-kafka-producer-interceptor/producer-interceptor.go @@ -0,0 +1,16 @@ +package main + +import "github.com/Trendyol/kafka-konsumer/v2" + +type producerInterceptor struct{} + +func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "x-source-app", + Value: []byte("kafka-konsumer"), + }) +} + +func newProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&producerInterceptor{}} +}