Skip to content

Commit 4aa5390

Browse files
committed
fixes #75: allow labels to be hashed into topic partitions
1 parent 8722897 commit 4aa5390

File tree

8 files changed

+127
-9
lines changed

8 files changed

+127
-9
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@
1111
# Output of the go coverage tool, specifically when used with LiteIDE
1212
*.out
1313

14+
/vendor
1415
/prometheus-kafka-adapter

config.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,24 @@ package main
1616

1717
import (
1818
"fmt"
19-
dto "github.com/prometheus/client_model/go"
20-
"github.com/prometheus/common/expfmt"
21-
"gopkg.in/yaml.v2"
2219
"os"
2320
"strings"
2421
"text/template"
22+
"time"
23+
24+
dto "github.com/prometheus/client_model/go"
25+
"github.com/prometheus/common/expfmt"
26+
"gopkg.in/yaml.v2"
2527

2628
"github.com/sirupsen/logrus"
2729
)
2830

2931
var (
3032
kafkaBrokerList = "kafka:9092"
3133
kafkaTopic = "metrics"
34+
kafkaPartitionLabels []string
35+
kafkaMetadataTimeout = time.Second * 10
36+
kafkaMetadataInterval = time.Minute * 5
3237
topicTemplate *template.Template
3338
match = make(map[string]*dto.MetricFamily, 0)
3439
basicauth = false
@@ -63,6 +68,23 @@ func init() {
6368
kafkaTopic = value
6469
}
6570

71+
if value := os.Getenv("KAFKA_PARTITION_LABELS"); value != "" {
72+
kafkaPartitionLabels = strings.Split(value, ",")
73+
}
74+
75+
if value := os.Getenv("KAFKA_METADATA_TIMEOUT"); value != "" {
76+
d, err := time.ParseDuration(value)
77+
if err != nil {
78+
logrus.WithError(err).Errorf("KAFKA_METADATA_TIMEOUT parsing failed, using default")
79+
} else {
80+
if d < 0 {
81+
logrus.Errorf("KAFKA_METADATA_TIMEOUT does not support negative timeout")
82+
} else {
83+
kafkaMetadataTimeout = d
84+
}
85+
}
86+
}
87+
6688
if value := os.Getenv("BASIC_AUTH_USERNAME"); value != "" {
6789
basicauth = true
6890
basicauthUsername = value

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ require (
44
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
55
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
66
github.com/beorn7/perks v1.0.1 // indirect
7-
github.com/confluentinc/confluent-kafka-go v1.6.1
7+
github.com/confluentinc/confluent-kafka-go v1.7.0
88
github.com/gin-gonic/contrib v0.0.0-20191209060500-d6e26eeaa607
99
github.com/gin-gonic/gin v1.6.3
1010
github.com/gogo/protobuf v1.3.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
88
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
99
github.com/confluentinc/confluent-kafka-go v1.6.1 h1:YxM/UtMQ2vgJX2gIgeJFUD0ANQYTEvfo4Cs4qKUlmGE=
1010
github.com/confluentinc/confluent-kafka-go v1.6.1/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
11+
github.com/confluentinc/confluent-kafka-go v1.7.0 h1:tXh3LWb2Ne0WiU3ng4h5qiGA9XV61rz46w60O+cq8bM=
12+
github.com/confluentinc/confluent-kafka-go v1.7.0/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
1113
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1214
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1315
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

handlers.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
package main
1616

1717
import (
18+
"hash/fnv"
1819
"io/ioutil"
1920
"net/http"
21+
"strings"
2022

2123
"github.com/gin-gonic/gin"
2224
"github.com/sirupsen/logrus"
@@ -62,10 +64,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
6264
}
6365

6466
for topic, metrics := range metricsPerTopic {
65-
t := topic
66-
part := kafka.TopicPartition{
67-
Partition: kafka.PartitionAny,
68-
Topic: &t,
67+
parts := strings.Split(topic, "|")
68+
var part kafka.TopicPartition
69+
70+
if len(parts) == 1 {
71+
part = kafka.TopicPartition{
72+
Partition: kafka.PartitionAny,
73+
Topic: &parts[0],
74+
}
75+
} else {
76+
77+
part = kafka.TopicPartition{
78+
Partition: getPartition(parts[0], parts[1]),
79+
Topic: &parts[0],
80+
}
6981
}
7082
for _, metric := range metrics {
7183
err := producer.Produce(&kafka.Message{
@@ -83,3 +95,15 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
8395

8496
}
8597
}
98+
99+
func getPartition(topic string, hashKey string) int32 {
100+
h := fnv.New32a()
101+
h.Write([]byte(hashKey))
102+
103+
v, ok := topicPartitionCount.Load(topic)
104+
if !ok {
105+
logrus.WithField("topic", topic).Error("did not find metadata requested topic")
106+
return 0
107+
}
108+
return int32(h.Sum32() % uint32(v.(int)))
109+
}

main.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package main
1616

1717
import (
18+
"context"
1819
"time"
1920

2021
"github.com/confluentinc/confluent-kafka-go/kafka"
@@ -28,6 +29,9 @@ import (
2829
func main() {
2930
log.Info("creating kafka producer")
3031

32+
ctx, cancel := context.WithCancel(context.Background())
33+
defer cancel()
34+
3135
kafkaConfig := kafka.ConfigMap{
3236
"bootstrap.servers": kafkaBrokerList,
3337
"compression.codec": kafkaCompression,
@@ -69,6 +73,12 @@ func main() {
6973
logrus.WithError(err).Fatal("couldn't create kafka producer")
7074
}
7175

76+
if kafkaPartitionLabels != nil {
77+
if err := syncTopicMetadata(ctx, producer); err != nil {
78+
logrus.WithError(err).Fatal("couldn't fetch topic metadata")
79+
}
80+
}
81+
7282
r := gin.New()
7383

7484
r.Use(ginrus.Ginrus(logrus.StandardLogger(), time.RFC3339, true), gin.Recovery())

serializers.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,22 @@ func NewAvroJSONSerializer(schemaPath string) (*AvroJSONSerializer, error) {
113113
}
114114

115115
func topic(labels map[string]string) string {
116-
var buf bytes.Buffer
116+
var buf, buf2 bytes.Buffer
117117
if err := topicTemplate.Execute(&buf, labels); err != nil {
118118
return ""
119119
}
120+
for _, s := range kafkaPartitionLabels {
121+
v, ok := labels[s]
122+
if ok {
123+
if _, err := buf2.WriteString(v); err != nil {
124+
return ""
125+
}
126+
}
127+
}
128+
if buf2.Len() > 0 {
129+
buf.WriteString("|")
130+
buf.WriteString(buf2.String())
131+
}
120132
return buf.String()
121133
}
122134

topic_metadata.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"math"
6+
"sync"
7+
"time"
8+
9+
"github.com/confluentinc/confluent-kafka-go/kafka"
10+
"github.com/sirupsen/logrus"
11+
)
12+
13+
var topicPartitionCount sync.Map
14+
15+
type metaDataFetcher interface {
16+
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
17+
}
18+
19+
func syncTopicMetadata(ctx context.Context, producer metaDataFetcher) error {
20+
21+
if err := processMetadata(producer); err != nil {
22+
return err
23+
}
24+
go func() {
25+
select {
26+
case <-ctx.Done():
27+
return
28+
29+
case <-time.After(kafkaMetadataInterval):
30+
if err := processMetadata(producer); err != nil {
31+
logrus.WithError(err).Error("could not fetch topic metadata")
32+
}
33+
}
34+
}()
35+
return nil
36+
}
37+
38+
func processMetadata(producer metaDataFetcher) error {
39+
metadata, err := producer.GetMetadata(nil, true, int(math.Ceil(kafkaMetadataTimeout.Seconds())))
40+
if err != nil {
41+
return err
42+
}
43+
for name, topic := range metadata.Topics {
44+
topicPartitionCount.Store(name, len(topic.Partitions))
45+
}
46+
return nil
47+
}

0 commit comments

Comments
 (0)