Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add producer interceptor interface #145

Merged
merged 7 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want.

</details>

#### With Producer Interceptor

Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor)

#### With Distributed Tracing Support

Please refer to [Tracing Example](examples/with-tracing/README.md)
Expand Down
38 changes: 38 additions & 0 deletions examples/with-kafka-producer-interceptor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"context"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
)

func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
}, newProducerInterceptor()...)

const topicName = "standart-topic"

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
})

_ = producer.ProduceBatch(context.Background(), []kafka.Message{
{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
},
{
Topic: topicName,
Key: []byte("2"),
Value: []byte(`{ "foo2": "bar2" }`),
},
})

fmt.Println("Messages sended...!")
}
16 changes: 16 additions & 0 deletions examples/with-kafka-producer-interceptor/producer-interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import "github.com/Trendyol/kafka-konsumer/v2"

type producerInterceptor struct{}

func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "x-source-app",
Value: []byte("kafka-konsumer"),
})
}

func newProducerInterceptor() []kafka.ProducerInterceptor {
return []kafka.ProducerInterceptor{&producerInterceptor{}}
}
34 changes: 25 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type Writer interface {
}

type producer struct {
w Writer
w Writer
interceptors []ProducerInterceptor
}

func NewProducer(cfg *ProducerConfig) (Producer, error) {
func NewProducer(cfg *ProducerConfig, interceptors ...ProducerInterceptor) (Producer, error) {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Writer.Brokers...),
Topic: cfg.Writer.Topic,
Expand Down Expand Up @@ -51,7 +52,7 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
kafkaWriter.Transport = transport
}

p := &producer{w: kafkaWriter}
p := &producer{w: kafkaWriter, interceptors: interceptors}

if cfg.DistributedTracingEnabled {
otelWriter, err := NewOtelProducer(cfg, kafkaWriter)
Expand All @@ -64,18 +65,33 @@ func NewProducer(cfg *ProducerConfig) (Producer, error) {
return p, nil
}

func (c *producer) Produce(ctx context.Context, message Message) error {
return c.w.WriteMessages(ctx, message.toKafkaMessage())
func (p *producer) Produce(ctx context.Context, message Message) error {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &message)
}

return p.w.WriteMessages(ctx, message.toKafkaMessage())
}

func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error {
func (p *producer) ProduceBatch(ctx context.Context, messages []Message) error {
kafkaMessages := make([]kafka.Message, 0, len(messages))
for i := range messages {
if len(p.interceptors) > 0 {
p.executeInterceptors(ctx, &messages[i])
}

kafkaMessages = append(kafkaMessages, messages[i].toKafkaMessage())
}
return c.w.WriteMessages(ctx, kafkaMessages...)

return p.w.WriteMessages(ctx, kafkaMessages...)
}

func (p *producer) executeInterceptors(ctx context.Context, message *Message) {
for _, interceptor := range p.interceptors {
interceptor.OnProduce(ProducerInterceptorContext{Context: ctx, Message: message})
}
}

func (c *producer) Close() error {
return c.w.Close()
func (p *producer) Close() error {
return p.w.Close()
}
14 changes: 14 additions & 0 deletions producer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package kafka

import (
"context"
)

type ProducerInterceptorContext struct {
Context context.Context
Message *Message
}

type ProducerInterceptor interface {
OnProduce(ctx ProducerInterceptorContext)
}
51 changes: 50 additions & 1 deletion producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"github.com/gofiber/fiber/v2/utils"

"github.com/segmentio/kafka-go"
)

Expand All @@ -20,6 +22,26 @@ func Test_producer_Produce_Successfully(t *testing.T) {
}
}

func Test_producer_Produce_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
msg := Message{Headers: make([]Header, 0)}
msg.Headers = append(msg.Headers, kafka.Header{
Key: "x-correlation-id",
Value: []byte(utils.UUIDv4()),
})
interceptor := newMockProducerInterceptor()

p := producer{w: mw, interceptors: interceptor}

// When
err := p.Produce(context.Background(), msg)
// Then
ademekici marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("Producing err %s", err.Error())
}
}

func Test_producer_ProduceBatch_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -33,6 +55,20 @@ func Test_producer_ProduceBatch_Successfully(t *testing.T) {
}
}

func Test_producer_ProduceBatch_interceptor_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
interceptor := newMockProducerInterceptor()
p := producer{w: mw, interceptors: interceptor}

// When
err := p.ProduceBatch(context.Background(), []Message{{}, {}, {}})
// Then
if err != nil {
t.Fatalf("Batch Producing err %s", err.Error())
}
}

func Test_producer_Close_Successfully(t *testing.T) {
// Given
mw := &mockWriter{}
Expand All @@ -48,10 +84,23 @@ func Test_producer_Close_Successfully(t *testing.T) {

type mockWriter struct{}

func (m *mockWriter) WriteMessages(_ context.Context, _ ...kafka.Message) error {
func (m *mockWriter) WriteMessages(_ context.Context, msg ...kafka.Message) error {
return nil
}

func (m *mockWriter) Close() error {
return nil
}

type mockProducerInterceptor struct{}

func (i *mockProducerInterceptor) OnProduce(ctx ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "test",
Value: []byte("test"),
})
}

func newMockProducerInterceptor() []ProducerInterceptor {
return []ProducerInterceptor{&mockProducerInterceptor{}}
}
Loading
Loading