Skip to content

Commit

Permalink
add offset.show-all to control offset logic
Browse files Browse the repository at this point in the history
  • Loading branch information
danielqsj committed Sep 8, 2021
1 parent 60a5243 commit af20ba0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
6 changes: 3 additions & 3 deletions dev/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
image: zookeeper:3.5.8
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
image: wurstmeister/kafka:2.13-2.7.0
ports:
- "9092:9092"
environment:
DOCKER_API_VERSION: 1.22
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
19 changes: 11 additions & 8 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Exporter struct {
zookeeperClient *kazoo.Kazoo
nextMetadataRefresh time.Time
metadataRefreshInterval time.Duration
offsetShowAll bool
}

type kafkaOpts struct {
Expand All @@ -82,6 +83,7 @@ type kafkaOpts struct {
realm string
keyTabPath string
kerberosAuthType string
offsetShowAll bool
}

// CanReadCertAndKey returns true if the certificate and key files already exists,
Expand Down Expand Up @@ -233,6 +235,7 @@ func NewExporter(opts kafkaOpts, topicFilter string, groupFilter string) (*Expor
zookeeperClient: zookeeperClient,
nextMetadataRefresh: time.Now(),
metadataRefreshInterval: interval,
offsetShowAll: opts.offsetShowAll,
}, nil
}

Expand Down Expand Up @@ -423,7 +426,13 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
}
for _, group := range describeGroups.Groups {
offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1}
if group.State == "Stable" {
if e.offsetShowAll {
for topic, partitions := range offset {
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
} else {
for _, member := range group.Members {
assignment, err := member.GetMemberAssignment()
if err != nil {
Expand All @@ -436,13 +445,6 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
}
}
}
} else {
//not really know what topic/partition this consumer group subscribe, so ask for all
for topic, partitions := range offset {
for partition := range partitions {
offsetFetchRequest.AddPartition(topic, partition)
}
}
}
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
Expand Down Expand Up @@ -556,6 +558,7 @@ func main() {
kingpin.Flag("zookeeper.server", "Address (hosts) of zookeeper server.").Default("localhost:2181").StringsVar(&opts.uriZookeeper)
kingpin.Flag("kafka.labels", "Kafka cluster name").Default("").StringVar(&opts.labels)
kingpin.Flag("refresh.metadata", "Metadata refresh interval").Default("30s").StringVar(&opts.metadataRefreshInterval)
kingpin.Flag("offset.show-all", "Whether show the offset/lag for all consumer group, otherwise, only show connected consumer groups").Default("true").BoolVar(&opts.offsetShowAll)

kingpin.Version(version.Print("kafka_exporter"))
kingpin.HelpFlag.Short('h')
Expand Down

0 comments on commit af20ba0

Please sign in to comment.