Skip to content

Commit

Permalink
docs: add producer interceptor example
Browse files Browse the repository at this point in the history
  • Loading branch information
ademekici committed Sep 26, 2024
1 parent 748e282 commit 88e3582
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 0 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want.

</details>

#### With Producer Interceptor

Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor)

#### With Distributed Tracing Support

Please refer to [Tracing Example](examples/with-tracing/README.md)
Expand Down
38 changes: 38 additions & 0 deletions examples/with-kafka-producer-interceptor/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"context"
"fmt"
"github.com/Trendyol/kafka-konsumer/v2"
)

func main() {
producer, _ := kafka.NewProducer(&kafka.ProducerConfig{
Writer: kafka.WriterConfig{
Brokers: []string{"localhost:29092"},
},
}, newProducerInterceptor()...)

const topicName = "standart-topic"

_ = producer.Produce(context.Background(), kafka.Message{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
})

_ = producer.ProduceBatch(context.Background(), []kafka.Message{
{
Topic: topicName,
Key: []byte("1"),
Value: []byte(`{ "foo": "bar" }`),
},
{
Topic: topicName,
Key: []byte("2"),
Value: []byte(`{ "foo2": "bar2" }`),
},
})

fmt.Println("Messages sended...!")
}
16 changes: 16 additions & 0 deletions examples/with-kafka-producer-interceptor/producer-interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import "github.com/Trendyol/kafka-konsumer/v2"

type producerInterceptor struct{}

func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) {
ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{
Key: "x-source-app",
Value: []byte("kafka-konsumer"),
})
}

func newProducerInterceptor() []kafka.ProducerInterceptor {
return []kafka.ProducerInterceptor{&producerInterceptor{}}
}

0 comments on commit 88e3582

Please sign in to comment.