Skip to content

Commit

Permalink
feat: add producer interceptor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ademekici committed Sep 19, 2024
1 parent a40194d commit c45b273
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 41 deletions.
14 changes: 14 additions & 0 deletions batch_producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type BatchProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type BatchProducerInterceptor interface {
OnProduce(ctx context.Context, msg Message)
}
2 changes: 1 addition & 1 deletion examples/with-deadletter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func main() {
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
}, nil)

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Expand Down
2 changes: 1 addition & 1 deletion examples/with-kafka-producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func main() {
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
})
}, nil)

const topicName = "standart-topic"

Expand Down
2 changes: 1 addition & 1 deletion examples/with-kafka-transactional-retry-disabled/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{Brokers: []string{"localhost:29092"}, Topic: "standart-topic"},
})
}, nil)

producer.ProduceBatch(context.Background(), []kafka.Message{
{Key: []byte("key1"), Value: []byte("message1")},
Expand Down
28 changes: 19 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
w Writer
interceptor *ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter}
p := &producer{w: kafkaWriter, interceptor: interceptor}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -64,18 +65,27 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
return p, nil
}

func (c *producer) Produce(ctx context.Context, message Message) error {
return c.w.WriteMessages(ctx, message.toKafkaMessage())
func (p *producer) Produce(ctx context.Context, message Message) error {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message})
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
}

func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]})
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return c.w.WriteMessages(ctx, kafkaMessages...)

return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (c *producer) Close() error {
return c.w.Close()
func (p *producer) Close() error {
return p.w.Close()
}
14 changes: 14 additions & 0 deletions producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type ProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
39 changes: 38 additions & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package kafka

import (
"context"
stubData "github.com/Trendyol/kafka-konsumer/v2/test/stub-data"
"github.com/gofiber/fiber/v2/utils"
"testing"

"github.com/segmentio/kafka-go"
Expand All @@ -20,6 +22,27 @@ func Test_producer_Produce_Successfully(t *testing.T) {
}
}

func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
msg := Message{Headers: make([]Header, 0)}
msg.Headers = append(msg.Headers, kafka.Header{
Key: "x-correlation-id",
Value: []byte(utils.UUIDv4()),
})
interceptor := stubData.NewMockProducerInterceptor()

p := producer{w: mw, interceptor: &interceptor}

// When
err := p.Produce(context.Background(), msg)

// Then
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -33,6 +56,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
}
}

func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := stubData.NewMockProducerInterceptor()
p := producer{w: mw, interceptor: &interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -48,7 +85,7 @@ func Test_producer_Close_Successfully(t *testing.T) {

type mockWriter struct{}

func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/integration/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3.8"
services:
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-amd64 #for m1 => v23.3.9-arm64
image: docker.redpanda.com/redpandadata/redpanda:v23.3.9-arm64 #for m1 => v23.3.9-arm64
container_name: redpanda-1
command:
- redpanda
Expand Down
134 changes: 107 additions & 27 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
stub_data "github.com/Trendyol/kafka-konsumer/v2/test/stub-data"
segmentio "github.com/segmentio/kafka-go"
"testing"
"time"
Expand All @@ -14,40 +15,99 @@ import (
func Test_Should_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
t.Run("without interceptor", func(t *testing.T) {
//Given

topic := "produce-topic"
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
},
})
}, nil)

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
t.Run("with interceptor", func(t *testing.T) {
// Given
topic := "produce-interceptor-topic"
consumerGroup := "produce-topic-cg"
interceptor := stub_data.NewMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Transport: &kafka.TransportConfig{
MetadataTopics: []string{
topic,
},
},
}, &interceptor)

// When
err := producer.Produce(context.Background(), kafka.Message{
Key: []byte("1"),
Value: []byte(`foo`),
})

messageCh := make(chan *kafka.Message)

consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{Brokers: []string{brokerAddress}, Topic: topic, GroupID: consumerGroup},
ConsumeFn: func(message *kafka.Message) error {
messageCh <- message
return nil
},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()

consumer.Consume()

// Then

if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}

actual := <-messageCh
if string(actual.Value) != "foo" {
t.Fatalf("Value does not equal %s", actual.Value)
}
if string(actual.Key) != "1" {
t.Fatalf("Key does not equal %s", actual.Key)
}
if len(actual.Headers) != 1 {
t.Fatalf("Header size does not equal %d", len(actual.Headers))
}
if string(actual.Headers[0].Key) != stub_data.XSourceAppKey {
t.Fatalf("Header key does not equal %s", actual.Headers[0].Key)
}
if string(actual.Headers[0].Value) != stub_data.XSourceAppValue {
t.Fatalf("Header value does not equal %s", actual.Headers[0].Value)
}
})
}

func Test_Should_Batch_Produce_Successfully(t *testing.T) {
// Given
t.Parallel()
topic := "batch-produce-topic"
brokerAddress := "localhost:9092"

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}})

// When
msgs := []kafka.Message{
{
Key: []byte("1"),
Expand All @@ -59,13 +119,33 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) {
},
}

// When
err := producer.ProduceBatch(context.Background(), msgs)
t.Run("without interceptor", func(t *testing.T) {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})

t.Run("with interceptor", func(t *testing.T) {
interceptor := stub_data.NewMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor)

// When
err := producer.ProduceBatch(context.Background(), msgs)

// Then
if err != nil {
t.Fatalf("Error while producing err %s", err.Error())
}
})
}

func Test_Should_Consume_Message_Successfully(t *testing.T) {
Expand Down
21 changes: 21 additions & 0 deletions test/stub-data/producer_interceptor_stub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package test

import "github.com/Trendyol/kafka-konsumer/v2"

type MockProducerInterceptor struct{}

const (
XSourceAppKey = "x-source-app"
XSourceAppValue = "kafka-konsumer"
)

func (i *MockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: XSourceAppKey,
Value: []byte(XSourceAppValue),
})
}

func NewMockProducerInterceptor() kafka.ProducerInterceptor {
return &MockProducerInterceptor{}
}

0 comments on commit c45b273

Please sign in to comment.