Skip to content

Commit

Permalink
feat: added method to publish many messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 2c63bb4 commit d70a4fd
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ type MessageBusReader[T BusMessage[any]] interface {
type MessageBusWriter[T BusMessage[any]] interface {
GetMessageType() BusMessageType
Publish(context.Context, T) error
PublishMany(context.Context, []T) error
Close() error
}
1 change: 1 addition & 0 deletions pkg/bus/bus_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
var (
ErrSerializeMessage = errors.New("unable to serialize message")
ErrSendingMessage = errors.New("unable to send message on bus")
ErrEncodingMessage = errors.New("unable to encode message")
)

type kafkaBus[T BusMessage[any]] struct {
Expand Down
98 changes: 80 additions & 18 deletions pkg/bus/bus_kafka_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"context"
"encoding/gob"
"errors"
"fmt"
"sync"

"github.com/ShatteredRealms/go-common-service/pkg/config"
"github.com/ShatteredRealms/go-common-service/pkg/log"
Expand All @@ -19,30 +21,18 @@ type kafkaBusWriter[T BusMessage[any]] struct {

// Publish implements MessageBus.
func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error {
k.mu.Lock()
if k.Writer == nil {
k.Writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: k.brokers.Addresses(),
Topic: k.topic,
Balancer: &kafka.LeastBytes{},
Async: true,
Logger: kafka.LoggerFunc(log.Logger.Tracef),
})
k.Writer.AllowAutoTopicCreation = true
}
k.mu.Unlock()
k.setupWriter()

k.wg.Add(1)
defer k.wg.Done()
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(msg)
val, err := k.encodeMessage(msg)
if err != nil {
return fmt.Errorf("%w: %w", ErrSerializeMessage, err)
return err
}

k.wg.Add(1)
defer k.wg.Done()
err = k.Writer.WriteMessages(ctx, kafka.Message{
Key: []byte(msg.GetId()),
Value: buf.Bytes(),
Value: val,
})
if err != nil {
return fmt.Errorf("%w: %w", ErrSendingMessage, err)
Expand All @@ -51,20 +41,92 @@ func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error {
return nil
}

func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error {
k.wg.Add(1)
defer k.wg.Done()

k.setupWriter()

messages := make([]kafka.Message, len(msgs))
var errs error
errsMu := sync.Mutex{}
wg := sync.WaitGroup{}

wg.Add(len(msgs))
for idx, msg := range msgs {
go func(msg T) {
defer k.wg.Done()
val, err := k.encodeMessage(msg)
if err != nil {
errsMu.Lock()
errs = errors.Join(errs, fmt.Errorf("%w: %w", ErrEncodingMessage, err))
errsMu.Unlock()
return
}

messages[idx] = kafka.Message{
Key: []byte(msg.GetId()),
Value: val,
}
}(msg)
}

wg.Wait()
if errs != nil {
return errs
}

err := k.Writer.WriteMessages(ctx, messages...)
if err != nil {
return fmt.Errorf("%w: %w", ErrSendingMessage, err)
}

return nil
}

func (k *kafkaBusWriter[T]) GetMessageType() BusMessageType {
return BusMessageType(k.topic)
}

func (k *kafkaBusWriter[T]) Close() error {
k.wg.Wait()

k.mu.Lock()
defer k.mu.Unlock()

if k.Writer != nil {
err := k.Writer.Close()
k.Writer = nil
return err
}

return nil
}

func (k *kafkaBusWriter[T]) setupWriter() {
k.mu.Lock()
defer k.mu.Unlock()
if k.Writer == nil {
k.Writer = kafka.NewWriter(kafka.WriterConfig{
Brokers: k.brokers.Addresses(),
Topic: k.topic,
Balancer: &kafka.LeastBytes{},
Async: true,
Logger: kafka.LoggerFunc(log.Logger.Tracef),
})
k.Writer.AllowAutoTopicCreation = true
}
}

func (k *kafkaBusWriter[T]) encodeMessage(msg T) ([]byte, error) {
var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(msg)
if err != nil {
return nil, fmt.Errorf("%w: %w", ErrSerializeMessage, err)
}
return buf.Bytes(), nil
}

func NewKafkaMessageBusWriter[T BusMessage[any]](brokers config.ServerAddresses, msg T) MessageBusWriter[T] {
return &kafkaBusWriter[T]{
kafkaBus: &kafkaBus[T]{
Expand Down

0 comments on commit d70a4fd

Please sign in to comment.