Skip to content

Commit

Permalink
Fixing a bug, changing some indents, choosing consistent way of throw…
Browse files Browse the repository at this point in the history
…ing error
  • Loading branch information
nikunjy committed Apr 9, 2021
1 parent d7f1a13 commit 2f348b4
Show file tree
Hide file tree
Showing 14 changed files with 853 additions and 142 deletions.
Binary file removed .DS_Store
Binary file not shown.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ kafka_exporter

# Test configuration
test/
.DS_Store
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/eapache/queue v1.1.0 // indirect
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.2.1 // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/common v0.19.0
Expand Down
295 changes: 153 additions & 142 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"time"

"github.com/Shopify/sarama"
kazoo "github.com/krallistic/kazoo-go"
"github.com/krallistic/kazoo-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
plog "github.com/prometheus/common/log"
Expand Down Expand Up @@ -137,7 +138,10 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor

case "plain":
default:
plog.Fatalf("invalid sasl mechanism \"%s\": can only be \"scram-sha256\", \"scram-sha512\" or \"plain\"", opts.saslMechanism)
return nil, fmt.Errorf(
`invalid sasl mechanism "%s": can only be "scram-sha256", "scram-sha512" or "plain"`,
opts.saslMechanism,
)
}

config.Net.SASL.Enable = true
Expand All @@ -164,44 +168,46 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor
if ca, err := ioutil.ReadFile(opts.tlsCAFile); err == nil {
config.Net.TLS.Config.RootCAs.AppendCertsFromPEM(ca)
} else {
plog.Fatalln(err)
return nil, err
}
}

canReadCertAndKey, err := CanReadCertAndKey(opts.tlsCertFile, opts.tlsKeyFile)
if err != nil {
plog.Fatalln(err)
return nil, errors.Wrap(err, "error reading cert and key")
}
if canReadCertAndKey {
cert, err := tls.LoadX509KeyPair(opts.tlsCertFile, opts.tlsKeyFile)
if err == nil {
config.Net.TLS.Config.Certificates = []tls.Certificate{cert}
} else {
plog.Fatalln(err)
return nil, err
}
}
}

if opts.useZooKeeperLag {
plog.Infoln("Using zookeeper lag, so connecting to zookeeper")
zookeeperClient, err = kazoo.NewKazoo(opts.uriZookeeper, nil)
if err != nil {
return nil, errors.Wrap(err, "error connecting to zookeeper")
}
}

interval, err := time.ParseDuration(opts.metadataRefreshInterval)
if err != nil {
plog.Errorln("Cannot parse metadata refresh interval")
panic(err)
return nil, errors.Wrap(err, "Cannot parse metadata refresh interval")
}

config.Metadata.RefreshFrequency = interval

client, err := sarama.NewClient(opts.uri, config)

if err != nil {
plog.Errorln("Error Init Kafka Client")
panic(err)
return nil, errors.Wrap(err, "Error Init Kafka Client")
}
plog.Infoln("Done Init Clients")

plog.Infoln("Done Init Clients")
// Init our exporter.
return &Exporter{
client: client,
Expand Down Expand Up @@ -263,103 +269,104 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

getTopicMetrics := func(topic string) {
defer wg.Done()
if e.topicFilter.MatchString(topic) {
partitions, err := e.client.Partitions(topic)
if !e.topicFilter.MatchString(topic) {
return
}
partitions, err := e.client.Partitions(topic)
if err != nil {
plog.Errorf("Cannot get partitions of topic %s: %v", topic, err)
return
}
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)
e.mu.Lock()
offset[topic] = make(map[int32]int64, len(partitions))
e.mu.Unlock()
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
plog.Errorf("Cannot get partitions of topic %s: %v", topic, err)
return
plog.Errorf("Cannot get leader of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
}
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)
e.mu.Lock()
offset[topic] = make(map[int32]int64, len(partitions))
e.mu.Unlock()
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
plog.Errorf("Cannot get leader of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
}

currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
plog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, err)
} else {
e.mu.Lock()
offset[topic][partition] = currentOffset
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
currentOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetNewest)
if err != nil {
plog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, err)
} else {
e.mu.Lock()
offset[topic][partition] = currentOffset
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}

oldestOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
plog.Errorf("Cannot get oldest offset of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}
oldestOffset, err := e.client.GetOffset(topic, partition, sarama.OffsetOldest)
if err != nil {
plog.Errorf("Cannot get oldest offset of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}

replicas, err := e.client.Replicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
replicas, err := e.client.Replicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}

inSyncReplicas, err := e.client.InSyncReplicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get in-sync replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}
inSyncReplicas, err := e.client.InSyncReplicas(topic, partition)
if err != nil {
plog.Errorf("Cannot get in-sync replicas of topic %s partition %d: %v", topic, partition, err)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}

if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}

if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}
if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}

if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()
if e.useZooKeeperLag {
ConsumerGroups, err := e.zookeeperClient.Consumergroups()

if err != nil {
plog.Errorf("Cannot get consumer group %v", err)
}
if err != nil {
plog.Errorf("Cannot get consumer group %v", err)
}

for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {
for _, group := range ConsumerGroups {
offset, _ := group.FetchOffset(topic, partition)
if offset > 0 {

consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupLagZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupLagZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
}
}
Expand Down Expand Up @@ -408,60 +415,64 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)
if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil {
offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest)
if err != nil {
plog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err)
} else {
for topic, partitions := range offsetFetchResponse.Blocks {
// If the topic is not consumed by that consumer group, skip it
topicConsumed := false
for _, offsetFetchResponseBlock := range partitions {
// Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset != -1 {
topicConsumed = true
break
}
continue
}

for topic, partitions := range offsetFetchResponse.Blocks {
// If the topic is not consumed by that consumer group, skip it
topicConsumed := false
for _, offsetFetchResponseBlock := range partitions {
// Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group
if offsetFetchResponseBlock.Offset != -1 {
topicConsumed = true
break
}
if topicConsumed {
var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
err := offsetFetchResponseBlock.Err
if err != sarama.ErrNoError {
plog.Errorf("Error for partition %d :%v", partition, err.Error())
continue
}
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
var lag int64
if offsetFetchResponseBlock.Offset == -1 {
lag = -1
} else {
lag = offset - offsetFetchResponseBlock.Offset
lagSum += lag
}
ch <- prometheus.MustNewConstMetric(
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
} else {
plog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition)
}
e.mu.Unlock()
}
if !topicConsumed {
continue
}

var currentOffsetSum int64
var lagSum int64
for partition, offsetFetchResponseBlock := range partitions {
err := offsetFetchResponseBlock.Err
if err != sarama.ErrNoError {
plog.Errorf("Error for partition %d :%v", partition, err.Error())
continue
}
currentOffset := offsetFetchResponseBlock.Offset
currentOffsetSum += currentOffset
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
if offset, ok := offset[topic][partition]; ok {
// If the topic is consumed by that consumer group, but no offset associated with the partition
// forcing lag to -1 to be able to alert on that
var lag int64
if offsetFetchResponseBlock.Offset == -1 {
lag = -1
} else {
lag = offset - offsetFetchResponseBlock.Offset
lagSum += lag
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
} else {
plog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition)
}
e.mu.Unlock()
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
)
}
}
}
Expand Down
Loading

0 comments on commit 2f348b4

Please sign in to comment.