diff --git a/pkg/bus/bus_kafka_reader.go b/pkg/bus/bus_kafka_reader.go index 0bf752d..6dbcc3a 100644 --- a/pkg/bus/bus_kafka_reader.go +++ b/pkg/bus/bus_kafka_reader.go @@ -116,7 +116,7 @@ func (k *kafkaBusReader[T]) Reset(ctx context.Context) error { log.Logger.WithContext(ctx).WithField("func", "Bus Reset").Infof("Getting offsets for %d partitions", len(partitions)) for _, partition := range partitions { wg.Add(1) - go func() { + go func(partition kafka.Partition) { defer wg.Done() leaderAddr := fmt.Sprintf("%s:%d", partition.Leader.Host, partition.Leader.Port) c, err := kafka.DialLeader(ctx, "tcp", leaderAddr, k.topic, partition.ID) @@ -139,7 +139,7 @@ func (k *kafkaBusReader[T]) Reset(ctx context.Context) error { offsetMu.Lock() offsets[partition.ID] = offset offsetMu.Unlock() - }() + }(partition) } // Wait for all the offsets to be read