Skip to content

Commit f52e2d2

Browse files
daniel-garciakd7lxl
authored andcommitted
fixes #75: allow labels to be hashed into topic partitions
1 parent 6a1dbf5 commit f52e2d2

File tree

5 files changed

+123
-8
lines changed

5 files changed

+123
-8
lines changed

config.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@ package main
1616

1717
import (
1818
"fmt"
19-
dto "github.com/prometheus/client_model/go"
20-
"github.com/prometheus/common/expfmt"
21-
"gopkg.in/yaml.v2"
2219
"os"
2320
"strings"
2421
"text/template"
22+
"time"
23+
24+
dto "github.com/prometheus/client_model/go"
25+
"github.com/prometheus/common/expfmt"
26+
"gopkg.in/yaml.v2"
2527

2628
"github.com/sirupsen/logrus"
2729
)
2830

2931
var (
3032
kafkaBrokerList = "kafka:9092"
3133
kafkaTopic = "metrics"
34+
kafkaPartitionLabels []string
35+
kafkaMetadataTimeout = time.Second * 10
36+
kafkaMetadataInterval = time.Minute * 5
3237
topicTemplate *template.Template
3338
match = make(map[string]*dto.MetricFamily, 0)
3439
basicauth = false
@@ -63,6 +68,23 @@ func init() {
6368
kafkaTopic = value
6469
}
6570

71+
if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" {
72+
kafkaPartitionLabels = strings.Split(value, ",")
73+
}
74+
75+
if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" {
76+
d, err := time.ParseDuration(value)
77+
if err != nil {
78+
logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default")
79+
} else {
80+
if d < 0 {
81+
logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout")
82+
} else {
83+
kafkaMetadataTimeout = d
84+
}
85+
}
86+
}
87+
6688
if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
6789
basicauth = true
6890
basicauthUsername = value

handlers.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ package main
1616

1717
import (
1818
"fmt"
19+
"hash/fnv"
1920
"io/ioutil"
2021
"net/http"
22+
"strings"
2123

2224
"github.com/gin-gonic/gin"
2325
"github.com/sirupsen/logrus"
@@ -62,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
6264
return
6365
}
6466

65-
for topic, metrics := range metricsPerTopic {
66-
t := topic
67+
for topicAndHashKey, metrics := range metricsPerTopic {
68+
69+
topic, partitionID, err := getPartitionAndTopic(topicAndHashKey)
70+
if err != nil {
71+
continue
72+
}
73+
6774
part := kafka.TopicPartition{
68-
Partition: kafka.PartitionAny,
69-
Topic: &t,
75+
Partition: partitionID,
76+
Topic: &topic,
7077
}
7178
for _, metric := range metrics {
7279
objectsWritten.Add(float64(1))
@@ -87,3 +94,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
8794

8895
}
8996
}
97+
98+
func getPartitionAndTopic(topic string) (string, int32, error) {
99+
parts := strings.Split(topic, "|")
100+
101+
if len(parts) == 1 {
102+
return parts[0], kafka.PartitionAny, nil
103+
}
104+
h := fnv.New32a()
105+
h.Write([]byte(parts[1]))
106+
107+
v, ok := topicPartitionCount.Load(parts[0])
108+
if !ok {
109+
logrus.WithField("topic", parts[0]).Error("did not find metadata requested topic")
110+
return topic, kafka.PartitionAny, fmt.Errorf("could not")
111+
}
112+
return parts[0], int32(h.Sum32() % uint32(v.(int))), nil
113+
}

main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package main
1616

1717
import (
18+
"context"
1819
"time"
1920

2021
"github.com/confluentinc/confluent-kafka-go/kafka"
@@ -27,6 +28,9 @@ import (
2728
func main() {
2829
logrus.Info("creating kafka producer")
2930

31+
ctx, cancel := context.WithCancel(context.Background())
32+
defer cancel()
33+
3034
kafkaConfig := kafka.ConfigMap{
3135
"bootstrap.servers": kafkaBrokerList,
3236
"compression.codec": kafkaCompression,
@@ -68,6 +72,12 @@ func main() {
6872
logrus.WithError(err).Fatal("couldn't create kafka producer")
6973
}
7074

75+
if kafkaPartitionLabels != nil {
76+
if err := syncTopicMetadata(ctx, producer); err != nil {
77+
logrus.WithError(err).Fatal("couldn't fetch topic metadata")
78+
}
79+
}
80+
7181
r := gin.New()
7282

7383
r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())

serializers.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,10 +116,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
116116
}
117117

118118
func topic(labels map[string]string) string {
119-
var buf bytes.Buffer
119+
var buf, buf2 bytes.Buffer
120120
if err := topicTemplate.Execute(&buf, labels); err != nil {
121121
return ""
122122
}
123+
for _, s := range kafkaPartitionLabels {
124+
v, ok := labels[s]
125+
if ok {
126+
if _, err := buf2.WriteString(v); err != nil {
127+
return ""
128+
}
129+
}
130+
}
131+
if buf2.Len() > 0 {
132+
buf.WriteString("|")
133+
buf.WriteString(buf2.String())
134+
}
123135
return buf.String()
124136
}
125137

topic_metadata.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"math"
6+
"sync"
7+
"time"
8+
9+
"github.com/confluentinc/confluent-kafka-go/kafka"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
var topicPartitionCount sync.Map
14+
15+
type metaDataFetcher interface {
16+
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
17+
}
18+
19+
func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error {
20+
21+
if err := processMetadata(producer); err != nil {
22+
return err
23+
}
24+
go func() {
25+
select {
26+
case <-ctx.Done():
27+
return
28+
29+
case <-time.After(kafkaMetadataInterval):
30+
if err := processMetadata(producer); err != nil {
31+
logrus.WithError(err).Error("could not fetch topic metadata")
32+
}
33+
}
34+
}()
35+
return nil
36+
}
37+
38+
func processMetadata(producer metaDataFetcher) error {
39+
metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds())))
40+
if err != nil {
41+
return err
42+
}
43+
for name, topic := range metadata.Topics {
44+
topicPartitionCount.Store(name, len(topic.Partitions))
45+
}
46+
return nil
47+
}

0 commit comments

Comments
 (0)