From d70a4fd86c09ed2b8f6a019631721b2d67182505 Mon Sep 17 00:00:00 2001 From: Wil Simpson Date: Wed, 11 Dec 2024 15:04:59 -0500 Subject: [PATCH] feat: added method to publish many messages --- pkg/bus/bus.go | 1 + pkg/bus/bus_kafka.go | 1 + pkg/bus/bus_kafka_writer.go | 98 ++++++++++++++++++++++++++++++------- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 6093986..28dd439 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -27,5 +27,6 @@ type MessageBusReader[T BusMessage[any]] interface { type MessageBusWriter[T BusMessage[any]] interface { GetMessageType() BusMessageType Publish(context.Context, T) error + PublishMany(context.Context, []T) error Close() error } diff --git a/pkg/bus/bus_kafka.go b/pkg/bus/bus_kafka.go index c993431..c283f86 100644 --- a/pkg/bus/bus_kafka.go +++ b/pkg/bus/bus_kafka.go @@ -10,6 +10,7 @@ import ( var ( ErrSerializeMessage = errors.New("unable to serialize message") ErrSendingMessage = errors.New("unable to send message on bus") + ErrEncodingMessage = errors.New("unable to encode message") ) type kafkaBus[T BusMessage[any]] struct { diff --git a/pkg/bus/bus_kafka_writer.go b/pkg/bus/bus_kafka_writer.go index eb17121..83f2922 100644 --- a/pkg/bus/bus_kafka_writer.go +++ b/pkg/bus/bus_kafka_writer.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "encoding/gob" + "errors" "fmt" + "sync" "github.com/ShatteredRealms/go-common-service/pkg/config" "github.com/ShatteredRealms/go-common-service/pkg/log" @@ -19,30 +21,18 @@ type kafkaBusWriter[T BusMessage[any]] struct { // Publish implements MessageBus. func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error { - k.mu.Lock() - if k.Writer == nil { - k.Writer = kafka.NewWriter(kafka.WriterConfig{ - Brokers: k.brokers.Addresses(), - Topic: k.topic, - Balancer: &kafka.LeastBytes{}, - Async: true, - Logger: kafka.LoggerFunc(log.Logger.Tracef), - }) - k.Writer.AllowAutoTopicCreation = true - } - k.mu.Unlock() + k.setupWriter() - k.wg.Add(1) - defer k.wg.Done() - var buf bytes.Buffer - err := gob.NewEncoder(&buf).Encode(msg) + val, err := k.encodeMessage(msg) if err != nil { - return fmt.Errorf("%w: %w", ErrSerializeMessage, err) + return err } + k.wg.Add(1) + defer k.wg.Done() err = k.Writer.WriteMessages(ctx, kafka.Message{ Key: []byte(msg.GetId()), - Value: buf.Bytes(), + Value: val, }) if err != nil { return fmt.Errorf("%w: %w", ErrSendingMessage, err) @@ -51,20 +41,92 @@ func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error { return nil } +func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error { + k.wg.Add(1) + defer k.wg.Done() + + k.setupWriter() + + messages := make([]kafka.Message, len(msgs)) + var errs error + errsMu := sync.Mutex{} + wg := sync.WaitGroup{} + + wg.Add(len(msgs)) + for idx, msg := range msgs { + go func(msg T) { + defer k.wg.Done() + val, err := k.encodeMessage(msg) + if err != nil { + errsMu.Lock() + errs = errors.Join(errs, fmt.Errorf("%w: %w", ErrEncodingMessage, err)) + errsMu.Unlock() + return + } + + messages[idx] = kafka.Message{ + Key: []byte(msg.GetId()), + Value: val, + } + }(msg) + } + + wg.Wait() + if errs != nil { + return errs + } + + err := k.Writer.WriteMessages(ctx, messages...) + if err != nil { + return fmt.Errorf("%w: %w", ErrSendingMessage, err) + } + + return nil +} + func (k *kafkaBusWriter[T]) GetMessageType() BusMessageType { return BusMessageType(k.topic) } func (k *kafkaBusWriter[T]) Close() error { k.wg.Wait() + + k.mu.Lock() + defer k.mu.Unlock() + if k.Writer != nil { err := k.Writer.Close() k.Writer = nil return err } + return nil } +func (k *kafkaBusWriter[T]) setupWriter() { + k.mu.Lock() + defer k.mu.Unlock() + if k.Writer == nil { + k.Writer = kafka.NewWriter(kafka.WriterConfig{ + Brokers: k.brokers.Addresses(), + Topic: k.topic, + Balancer: &kafka.LeastBytes{}, + Async: true, + Logger: kafka.LoggerFunc(log.Logger.Tracef), + }) + k.Writer.AllowAutoTopicCreation = true + } +} + +func (k *kafkaBusWriter[T]) encodeMessage(msg T) ([]byte, error) { + var buf bytes.Buffer + err := gob.NewEncoder(&buf).Encode(msg) + if err != nil { + return nil, fmt.Errorf("%w: %w", ErrSerializeMessage, err) + } + return buf.Bytes(), nil +} + func NewKafkaMessageBusWriter[T BusMessage[any]](brokers config.ServerAddresses, msg T) MessageBusWriter[T] { return &kafkaBusWriter[T]{ kafkaBus: &kafkaBus[T]{