diff --git a/examples/with-kafka-producer/main.go b/examples/with-kafka-producer/main.go index 4f1e02e..edaee16 100644 --- a/examples/with-kafka-producer/main.go +++ b/examples/with-kafka-producer/main.go @@ -12,9 +12,24 @@ func main() { }, }) + const topicName = "standart-topic" + _ = producer.Produce(context.Background(), kafka.Message{ - Topic: "standart-topic", + Topic: topicName, Key: []byte("1"), Value: []byte(`{ "foo": "bar" }`), }) + + _ = producer.ProduceBatch(context.Background(), []kafka.Message{ + { + Topic: topicName, + Key: []byte("1"), + Value: []byte(`{ "foo": "bar" }`), + }, + { + Topic: topicName, + Key: []byte("2"), + Value: []byte(`{ "foo2": "bar2" }`), + }, + }) } diff --git a/producer.go b/producer.go index 4564e6c..13e1b35 100644 --- a/producer.go +++ b/producer.go @@ -8,6 +8,7 @@ import ( type Producer interface { Produce(ctx context.Context, message Message) error + ProduceBatch(ctx context.Context, messages []Message) error Close() error } @@ -28,6 +29,15 @@ func (c *producer) Produce(ctx context.Context, message Message) error { return c.w.WriteMessages(ctx, kafka.Message(message)) } +func (c *producer) ProduceBatch(ctx context.Context, messages []Message) error { + kafkaMessages := make([]kafka.Message, 0, len(messages)) + for i := range messages { + convertedMessage := kafka.Message(messages[i]) + kafkaMessages = append(kafkaMessages, convertedMessage) + } + return c.w.WriteMessages(ctx, kafkaMessages...) +} + func (c *producer) Close() error { return c.w.Close() }