diff --git a/README.md b/README.md index 1c71c86..f08eab8 100644 --- a/README.md +++ b/README.md @@ -196,6 +196,10 @@ After running `docker-compose up` command, you can run any application you want. +#### With Producer Interceptor + +Please refer to [Producer Interceptor Example](examples/with-kafka-producer-interceptor) + #### With Distributed Tracing Support Please refer to [Tracing Example](examples/with-tracing/README.md) diff --git a/examples/with-kafka-producer-interceptor/main.go b/examples/with-kafka-producer-interceptor/main.go new file mode 100644 index 0000000..9005712 --- /dev/null +++ b/examples/with-kafka-producer-interceptor/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "context" + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" +) + +func main() { + producer, _ := kafka.NewProducer(&kafka.ProducerConfig{ + Writer: kafka.WriterConfig{ + Brokers: []string{"localhost:29092"}, + }, + }, newProducerInterceptor()...) + + const topicName = "standart-topic" + + _ = producer.Produce(context.Background(), kafka.Message{ + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) + + fmt.Println("Messages sended...!") +} diff --git a/examples/with-kafka-producer-interceptor/producer-interceptor.go b/examples/with-kafka-producer-interceptor/producer-interceptor.go new file mode 100644 index 0000000..c8a9d1c --- /dev/null +++ b/examples/with-kafka-producer-interceptor/producer-interceptor.go @@ -0,0 +1,16 @@ +package main + +import "github.com/Trendyol/kafka-konsumer/v2" + +type producerInterceptor struct{} + +func (i *producerInterceptor) OnProduce(ctx kafka.ProducerInterceptorContext) { + ctx.Message.Headers = append(ctx.Message.Headers, kafka.Header{ + Key: "x-source-app", + Value: []byte("kafka-konsumer"), + }) +} + +func newProducerInterceptor() []kafka.ProducerInterceptor { + return []kafka.ProducerInterceptor{&producerInterceptor{}} +}