Skip to content

Commit

Permalink
feat: kafka bus tracer support
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 695fe8e commit 841b08a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
13 changes: 12 additions & 1 deletion pkg/bus/bus.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package bus

import "context"
import (
"context"

"go.opentelemetry.io/otel/attribute"
)

type BusMessageType string

Expand Down Expand Up @@ -34,3 +38,10 @@ type MessageBusWriter[T BusMessage[any]] interface {
PublishMany(context.Context, []T) error
Close() error
}

func BusMessageAttributes(msg BusMessage[any]) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("sro.bus.message.type", string(msg.GetType())),
attribute.String("sro.bus.message.id", msg.GetId()),
}
}
3 changes: 3 additions & 0 deletions pkg/bus/bus_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/ShatteredRealms/go-common-service/pkg/config"
"go.opentelemetry.io/otel/trace"
)

var (
Expand All @@ -17,6 +18,8 @@ type kafkaBus[T BusMessage[any]] struct {
brokers config.ServerAddresses
topic string

tracer trace.Tracer

mu sync.Mutex
wg sync.WaitGroup
}
2 changes: 2 additions & 0 deletions pkg/bus/bus_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/config"
"github.com/ShatteredRealms/go-common-service/pkg/log"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
)

// MessageBusReader is for reading messages from the message bus synchronously.
Expand Down Expand Up @@ -236,6 +237,7 @@ func NewKafkaMessageBusReader[T BusMessage[any]](brokers config.ServerAddresses,
kafkaBus: &kafkaBus[T]{
brokers: brokers,
topic: string(msg.GetType()),
tracer: otel.Tracer(fmt.Sprintf("sro.bus.kafka.reader.%s", msg.GetType())),
},
groupId: groupId,
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/bus/bus_kafka_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ShatteredRealms/go-common-service/pkg/config"
"github.com/ShatteredRealms/go-common-service/pkg/log"
"github.com/segmentio/kafka-go"
"go.opentelemetry.io/otel"
)

// MessageBusWriter is for writing message asynchronously to the message bus.
Expand All @@ -23,13 +24,18 @@ type kafkaBusWriter[T BusMessage[any]] struct {

// Publish implements MessageBus.
func (k *kafkaBusWriter[T]) Publish(ctx context.Context, msg T) error {
ctx, span := k.tracer.Start(ctx, "Publish")
defer span.End()
span.SetAttributes(BusMessageAttributes(msg)...)
k.setupWriter()

val, err := k.encodeMessage(msg)
if err != nil {
return err
}

ctx, innerSpan := k.tracer.Start(ctx, "Publish.SendMessages")
defer innerSpan.End()
k.wg.Add(1)
defer k.wg.Done()
err = k.Writer.WriteMessages(ctx, kafka.Message{
Expand All @@ -47,6 +53,14 @@ func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error {
k.wg.Add(1)
defer k.wg.Done()

if len(msgs) == 0 {
return nil
}

ctx, span := k.tracer.Start(ctx, "PublishMany")
span.SetAttributes(BusMessageAttributes(msgs[0])[0])
defer span.End()

k.setupWriter()

messages := make([]kafka.Message, len(msgs))
Expand All @@ -55,6 +69,7 @@ func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error {
errsMu := sync.Mutex{}
wg := sync.WaitGroup{}

ctx, innerSpan := k.tracer.Start(ctx, "PublishMany.CreateMessages")
for chunk := range slices.Chunk(msgs, runtime.NumCPU()) {
wg.Add(1)
go func(chunk []T) {
Expand All @@ -79,10 +94,13 @@ func (k *kafkaBusWriter[T]) PublishMany(ctx context.Context, msgs []T) error {
}

wg.Wait()
innerSpan.End()
if errs != nil {
return errs
}

ctx, innerSpan = k.tracer.Start(ctx, "PublishMany.SendMessages")
defer innerSpan.End()
err := k.Writer.WriteMessages(ctx, messages...)
if err != nil {
return fmt.Errorf("%w: %w", ErrSendingMessage, err)
Expand Down Expand Up @@ -139,6 +157,7 @@ func NewKafkaMessageBusWriter[T BusMessage[any]](brokers config.ServerAddresses,
kafkaBus: &kafkaBus[T]{
brokers: brokers,
topic: string(msg.GetType()),
tracer: otel.Tracer(fmt.Sprintf("sro.bus.kafka.reader.%s", msg.GetType())),
},
}
}
4 changes: 3 additions & 1 deletion pkg/srospan/attributes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package srospan

import "go.opentelemetry.io/otel/attribute"
import (
"go.opentelemetry.io/otel/attribute"
)

func SourceOwnerId(val string) attribute.KeyValue {
return attribute.String("sro.source.owner.id", val)
Expand Down

0 comments on commit 841b08a

Please sign in to comment.