Skip to content

Commit

Permalink
feat: kafak bus reader tracer support
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 841b08a commit 58ba3d5
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pkg/bus/bus_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@ type kafkaBusReader[T BusMessage[any]] struct {

// ReceiveMessages implements MessageBus.
func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
ctx, span := k.tracer.Start(ctx, "fetch")
defer span.End()
if k.isResetting {
ctx, innerSpan := k.tracer.Start(ctx, "fetch")
log.Logger.WithContext(ctx).Info("Waiting for reset to finish")
timer := time.NewTimer(30 * time.Second)
select {
case <-k.resetFinished:
case <-timer.C:
if k.isResetting {
innerSpan.End()
return nil, errors.New("resetting took too long")
}
}
innerSpan.End()
}

if k.currentMessage != nil {
Expand All @@ -57,6 +62,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
})
}

ctx, innerSpan := k.tracer.Start(ctx, "fetch.message")
wg := sync.WaitGroup{}
wg.Add(1)
var err error
Expand All @@ -66,6 +72,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
}()
k.mu.Unlock()
wg.Wait()
innerSpan.End()

if k.isResetting {
log.Logger.WithContext(ctx).Info("Reset started, skipping message")
Expand All @@ -75,6 +82,8 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
return nil, fmt.Errorf("%w: %w", ErrSerializeMessage, err)
}

ctx, innerSpan = k.tracer.Start(ctx, "fetch.decode")
defer innerSpan.End()
var data T
dec := gob.NewDecoder(bytes.NewReader(k.currentMessage.Value))
if err := dec.Decode(&data); err != nil {
Expand All @@ -85,6 +94,8 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
}

func (k *kafkaBusReader[T]) Reset(ctx context.Context) error {
ctx, span := k.tracer.Start(ctx, "reset")
defer span.End()
// Prevent multiple resets from happening at the same time
k.mu.Lock()
defer k.mu.Unlock()
Expand All @@ -104,20 +115,25 @@ func (k *kafkaBusReader[T]) Reset(ctx context.Context) error {
}

// Connect to the kafka cluster
ctx, innerSpan := k.tracer.Start(ctx, "reset.connect")
log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Info("Connecting to Kafka")
conn, err := kafka.DialContext(ctx, "tcp", k.brokers.Addresses()[0])
if err != nil {
return fmt.Errorf("unable to dial kafka: %w", err)
}
defer conn.Close()
innerSpan.End()

// Read the partitions for the topic
ctx, innerSpan = k.tracer.Start(ctx, "reset.partitions.get")
log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Info("Reading partitions")
partitions, err := conn.ReadPartitions(k.topic)
innerSpan.End()
if err != nil {
return fmt.Errorf("unable to read partitions: %w", err)
}

ctx, innerSpan = k.tracer.Start(ctx, "reset.partitions.offsets")
wg := sync.WaitGroup{}
errMu := sync.Mutex{}
offsetMu := sync.Mutex{}
Expand Down Expand Up @@ -157,30 +173,37 @@ func (k *kafkaBusReader[T]) Reset(ctx context.Context) error {

// Wait for all the offsets to be read
wg.Wait()
innerSpan.End()
if outErrors != nil {
return outErrors
}

ctx, innerSpan = k.tracer.Start(ctx, "reset.consumergroup.get")
log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Infof("Getting consumer group %s", k.groupId)
// Set offset to the beginning for this consumer group and topic
group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{
ID: k.groupId,
Brokers: k.brokers.Addresses(),
Topics: []string{k.topic},
})
innerSpan.End()
if err != nil {
return fmt.Errorf("unable to create consumer group: %w", err)
}
defer group.Close()

ctx, innerSpan = k.tracer.Start(ctx, "reset.consumergroup.generation.next")
log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Info("Getting next generation")
generation, err := group.Next(ctx)
innerSpan.End()
if err != nil {
return fmt.Errorf("unable to get next generation: %w", err)
}

ctx, innerSpan = k.tracer.Start(ctx, "reset.consumergroup.offsets.commit")
log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Info("Commiting offsets")
err = generation.CommitOffsets(map[string]map[int]int64{k.topic: offsets})
innerSpan.End()
if err != nil {
return fmt.Errorf("unable to commit offsets: %w", err)
}
Expand Down

0 comments on commit 58ba3d5

Please sign in to comment.