From 04af6d09930359a7341bd0f6c68d42c505dcf22d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?U=C4=9Furcan=20Erdo=C4=9Fan?= Date: Thu, 24 Aug 2023 15:41:37 +0300 Subject: [PATCH] feat: batch producer implemented to konsumer (#16) (#24) * feat: batch producer implemented to konsumer (#16) * feat: batch producer example added (#16) --- examples/with-kafka-producer/main.go | 17 ++++++++++++++++- producer.go | 10 ++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index 4f1e02e..edaee16 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -12,9 +12,24 @@ func main() { }, }) + const topicName = "standart-topic" + _ = producer.Produce(context.Background(), kafka.Message{ - Topic: "standart-topic", + Topic: topicName, Key: []byte("1"), Value: []byte(`{ "foo": "bar" }`), }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) } diff --git a/producer.go b/producer.go index 4564e6c..13e1b35 100644 --- a/producer.go +++ b/producer.go @@ -8,6 +8,7 @@ import ( type Producer interface { Produce(ctx context.Context, message Message) error + ProduceBatch(ctx context.Context, messages []Message) error Close() error } @@ -28,6 +29,15 @@ func (c *producer) Produce(ctx context.Context, message Message) error { return c.w.WriteMessages(ctx, kafka.Message(message)) } +func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { + kafkaMessages := make([]kafka.Message, 0, len(messages)) + for i := range messages { + convertedMessage := kafka.Message(messages[i]) + kafkaMessages = append(kafkaMessages, convertedMessage) + } + return c.w.WriteMessages(ctx, kafkaMessages...) +} + func (c *producer) Close() error { return c.w.Close() }