From 88e35827de09f23f1912a3e5f3e6803bc1833517 Mon Sep 17 00:00:00 2001 From: "adem.ekici" Date: Thu, 26 Sep 2024 13:24:58 +0300 Subject: [PATCH] docs: add producer interceptor example --- README.md | 4 ++ .../with-kafka-producer-interceptor/main.go | 38 +++++++++++++++++++ .../producer-interceptor.go | 16 ++++++++ 3 files changed, 58 insertions(+) create mode 100644 examples/with-kafka-producer-interceptor/main.go create mode 100644 examples/with-kafka-producer-interceptor/producer-interceptor.go diff --git a/README.md b/README.md index 1c71c86..f08eab8 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want. +#### With Producer Interceptor + +Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor) + #### With Distributed Tracing Support Please refer to [Tracing Example](examples/with-tracing/README.md) diff --git a/examples/with-kafka-producer-interceptor/main.go b/examples/with-kafka-producer-interceptor/main.go new file mode 100644 index 0000000..9005712 --- /dev/null +++ b/examples/with-kafka-producer-interceptor/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" +) + +func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }, newProducerInterceptor()...) + + const topicName = "standart-topic" + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) + + fmt.Println("Messages sended...!") +} diff --git a/examples/with-kafka-producer-interceptor/producer-interceptor.go b/examples/with-kafka-producer-interceptor/producer-interceptor.go new file mode 100644 index 0000000..c8a9d1c --- /dev/null +++ b/examples/with-kafka-producer-interceptor/producer-interceptor.go @@ -0,0 +1,16 @@ +package main + +import "github.com/Trendyol/kafka-konsumer/v2" + +type producerInterceptor struct{} + +func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "x-source-app", + Value: []byte("kafka-konsumer"), + }) +} + +func newProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&producerInterceptor{}} +}