Skip to content

Commit

Permalink
feat: add multi interceptor support
Browse files Browse the repository at this point in the history
  • Loading branch information
ademekici committed Sep 25, 2024
1 parent a7a5daf commit 8a39b9c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 42 deletions.
22 changes: 14 additions & 8 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
interceptor *ProducerInterceptor
w Writer
interceptors []ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -52,7 +52,7 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter, interceptor: interceptor}
p := &producer{w: kafkaWriter, interceptors: interceptors}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -66,8 +66,8 @@ func NewProducer(cfg *ProducerConfig, interceptor *ProducerInterceptor) (Produce
}

func (p *producer) Produce(ctx context.Context, message Message) error {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &message})
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &message)
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
Expand All @@ -76,8 +76,8 @@ func (p *producer) Produce(ctx context.Context, message Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if p.interceptor != nil {
(*p.interceptor).OnProduce(ProducerInterceptorContext{Context: ctx, Message: &messages[i]})
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &messages[i])
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
Expand All @@ -86,6 +86,12 @@ func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
for _, interceptor := range p.interceptors {
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
}
}

func (p *producer) Close() error {
return p.w.Close()
}
8 changes: 4 additions & 4 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
})
interceptor := newMockProducerInterceptor()

p := producer{w: mw, interceptor: &interceptor}
p := producer{w: mw, interceptors: interceptor}

// When
err := p.Produce(context.Background(), msg)
Expand All @@ -59,7 +59,7 @@ func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := newMockProducerInterceptor()
p := producer{w: mw, interceptor: &interceptor}
p := producer{w: mw, interceptors: interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
Expand Down Expand Up @@ -101,6 +101,6 @@ func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) {
})
}

func newMockProducerInterceptor() ProducerInterceptor {
return &mockProducerInterceptor{}
func newMockProducerInterceptor() []ProducerInterceptor {
return []ProducerInterceptor{&mockProducerInterceptor{}}
}
35 changes: 26 additions & 9 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
stubdata "github.com/Trendyol/kafka-konsumer/v2/test/stubdata"
segmentio "github.com/segmentio/kafka-go"
"testing"
"time"
Expand All @@ -28,7 +27,7 @@ func Test_Should_Produce_Successfully(t *testing.T) {
topic,
},
},
}, nil)
})

// When
err := producer.Produce(context.Background(), kafka.Message{
Expand All @@ -46,7 +45,7 @@ func Test_Should_Produce_Successfully(t *testing.T) {
// Given
topic := "produce-interceptor-topic"
consumerGroup := "produce-topic-cg"
interceptor := stubdata.NewMockProducerInterceptor()
interceptor := newMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}},
Expand All @@ -55,7 +54,7 @@ func Test_Should_Produce_Successfully(t *testing.T) {
topic,
},
},
}, &interceptor)
}, interceptor...)

// When
err := producer.Produce(context.Background(), kafka.Message{
Expand Down Expand Up @@ -94,10 +93,10 @@ func Test_Should_Produce_Successfully(t *testing.T) {
if len(actual.Headers) != 1 {
t.Fatalf("Header size does not equal %d", len(actual.Headers))
}
if string(actual.Headers[0].Key) != stubdata.XSourceAppKey {
if string(actual.Headers[0].Key) != xSourceAppKey {
t.Fatalf("Header key does not equal %s", actual.Headers[0].Key)
}
if string(actual.Headers[0].Value) != stubdata.XSourceAppValue {
if string(actual.Headers[0].Value) != xSourceAppValue {
t.Fatalf("Header value does not equal %s", actual.Headers[0].Value)
}
})
Expand All @@ -121,7 +120,7 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) {

t.Run("without interceptor", func(t *testing.T) {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, nil)
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}})

// When
err := producer.ProduceBatch(context.Background(), msgs)
Expand All @@ -133,10 +132,10 @@ func Test_Should_Batch_Produce_Successfully(t *testing.T) {
})

t.Run("with interceptor", func(t *testing.T) {
interceptor := stubdata.NewMockProducerInterceptor()
interceptors := newMockProducerInterceptor()

producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, &interceptor)
Writer: kafka.WriterConfig{AllowAutoTopicCreation: true, Topic: topic, Brokers: []string{brokerAddress}}}, interceptors...)

// When
err := producer.ProduceBatch(context.Background(), msgs)
Expand Down Expand Up @@ -643,3 +642,21 @@ func assertEventually(t *testing.T, condition func() bool, waitFor time.Duration
}
}
}

type mockProducerInterceptor struct{}

const (
xSourceAppKey = "x-source-app"
xSourceAppValue = "kafka-konsumer"
)

func (i *mockProducerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: xSourceAppKey,
Value: []byte(xSourceAppValue),
})
}

func newMockProducerInterceptor() []kafka.ProducerInterceptor {
return []kafka.ProducerInterceptor{&mockProducerInterceptor{}}
}
21 changes: 0 additions & 21 deletions test/stubdata/producer_interceptor_stub.go

This file was deleted.

0 comments on commit 8a39b9c

Please sign in to comment.