Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.0.0 #1

Open
wants to merge 53 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
8df85ae
added the kafka topic read program
Kaali09 Dec 26, 2019
c830081
added the kafka1 and kafak2 file
Kaali09 Dec 27, 2019
094fb07
DO-1943 added the prom data struct
Kaali09 Jan 2, 2020
a696486
DO-1943 added the prometheus metrics
Kaali09 Jan 3, 2020
4a83359
added the latest code
Kaali09 Jan 4, 2020
fd8c67e
updated the script
Kaali09 Jan 5, 2020
79fc0ad
fixes for api -version string
Kaali09 Jan 5, 2020
8003c8f
fixes for partion number incorrect
Kaali09 Jan 5, 2020
5ed1ac6
fixes for duplicate
Kaali09 Jan 5, 2020
f4cb7cf
updated the script
Kaali09 Jan 7, 2020
f582dba
added the kafka2.go file
Kaali09 Jan 7, 2020
3c6a676
test file
Kaali09 Feb 13, 2020
3c1677c
Update test.yaml
Kaali09 Feb 13, 2020
62ccb49
Update test.yaml
Kaali09 Feb 13, 2020
1d8e573
Update test.yaml
Kaali09 Feb 13, 2020
a225c03
Update test.yaml
Kaali09 Feb 13, 2020
5a2a717
Create secrets.yaml
Kaali09 Feb 13, 2020
0c706fd
test file
Kaali09 Feb 18, 2020
30c01e0
Update secrets.yaml
Kaali09 Feb 18, 2020
fd0ca41
added
Kaali09 Feb 18, 2020
62bf607
test
Kaali09 Feb 18, 2020
eadc929
Update devcon
Kaali09 Feb 19, 2020
46a5f57
Update devcon
Kaali09 Feb 19, 2020
7d8641f
Update devcon
Kaali09 Feb 19, 2020
0ff6a6e
Update devcon
Kaali09 Feb 19, 2020
5d157fb
Update devcon
Kaali09 Feb 19, 2020
eee65d5
test
Kaali09 Feb 19, 2020
583f6ee
pls work
Kaali09 Feb 19, 2020
7e2bdca
Update devcon
Kaali09 Feb 19, 2020
04de263
Update devcon
Kaali09 Feb 19, 2020
c34ff8f
Update devcon
Kaali09 Feb 19, 2020
ce2c0b6
Update devcon
Kaali09 Feb 19, 2020
d28ee61
Update devcon
Kaali09 Feb 19, 2020
dd841ad
Update devcon
Kaali09 Feb 19, 2020
033d329
Update devcon
Kaali09 Feb 19, 2020
049e884
Update devcon
Kaali09 Feb 19, 2020
67fb7a9
test
Kaali09 Feb 19, 2020
345ecf2
dei
Kaali09 Feb 19, 2020
d9b4810
hey
Kaali09 Feb 19, 2020
b0c4e29
Update devcon
Kaali09 Feb 19, 2020
2d97531
Update devcon
Kaali09 Feb 19, 2020
4d617d3
Update devcon
Kaali09 Feb 19, 2020
ba35bbb
Update devcon
Kaali09 Feb 19, 2020
1db5ab8
Update devcon
Kaali09 Feb 19, 2020
65d44bf
kali
Kaali09 Feb 19, 2020
3aa361e
hi
Kaali09 Feb 19, 2020
84b126c
hey
Kaali09 Feb 19, 2020
583950e
adding secrets
Kaali09 Feb 19, 2020
1e9c569
adding secrets
Kaali09 Feb 19, 2020
de7da0e
Create kafkaproducer.go
Kaali09 Feb 19, 2020
e107706
Update devcon
Kaali09 Feb 20, 2020
db07b9e
added the test
Kaali09 Feb 20, 2020
9f208c8
Create demo
Kaali09 Feb 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM scratch
ADD exporter /
CMD ["/exporter"]
1 change: 1 addition & 0 deletions demo
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
secret: pGi2ZAxvG9LSxI0a3DxqEFdi2/KC4UbKg1gP3lyzbkyh2v0AccdRVJU4EbqxpKAq03wagpfey/4wSjXv+oM3Hw==
1 change: 1 addition & 0 deletions devcon
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
azure_secret: pGi2ZAxvG9LSxI0a3DxqEFdi2/KC4UbKg1gP3lyzbkyh2v0AccdRVJU4EbqxpKAq03wagpfey/4wSjXv+oM3Hw==
Binary file added exporter
Binary file not shown.
130 changes: 130 additions & 0 deletions kafka-topic-exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
)

var (
cgroup string
topics []string
zookeeperConn string
)

type metrics struct {
job_name string
partition float64
metrics map[string]interface{}
}

var prometheusMetrics []metrics
var msg []string

func main() {

// Creating variables from cli
var tmp_topics string
flag.StringVar(&cgroup, "cgroup", "metrics.read", "Consumer group for samza metrics")
flag.StringVar(&tmp_topics, "topics", "topic1,topic2", "Topic names to read")
flag.StringVar(&zookeeperConn, "zookeeper", "11.2.1.15", "Ip addredd of the zookeeper. By default port will be 2181")
flag.Parse()
fmt.Println(tmp_topics)
topics = strings.Split(tmp_topics, ",")
fmt.Println("topics:", topics)
fmt.Println("consumergroup:", cgroup)
fmt.Println("zookeeperIPs:", zookeeperConn)

// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)
fmt.Println(sarama.Logger)
// prometheus.MustRegister(gauge)
http.HandleFunc("/metrics", serve)
// init consumer
cg, err := initConsumer()
if err != nil {
fmt.Println("Error consumer goup: ", err.Error())
os.Exit(1)
}
defer cg.Close()
// run consumer
go consume(cg)
log.Fatal(http.ListenAndServe(":8000", nil))
}
func serve(w http.ResponseWriter, r *http.Request) {
for _, value := range prometheusMetrics {
// fmt.Println(value.metrics)
for k, j := range value.metrics {
fmt.Fprintf(w, "samza_metrics_%v{job_name=\"%v\",partition=\"%v\"} %v\n", strings.ReplaceAll(k, "-", "_"), value.job_name, value.partition, j)
fmt.Printf("samza_metrics_%v{job_name=\"%v\",partition=\"%v\"} %v\n", strings.ReplaceAll(k, "-", "_"), value.job_name, value.partition, j)
}
}
prometheusMetrics = nil
}
func initConsumer() (*consumergroup.ConsumerGroup, error) {
// consumer config
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 10 * time.Second
// join to consumer group
cg, err := consumergroup.JoinConsumerGroup(cgroup, topics, []string{zookeeperConn}, config)
if err != nil {
return nil, err
}
return cg, err
}

// Metrics value should be of value type float64
// else drop the value
func metricsValidator(m map[string]interface{}) map[string]interface{} {
for key, val := range m {
switch v, ok := val.(float64); ok {
// converting interface to float64
case true:
m[key] = v
// Dropping not float64 values
default:
fmt.Println("Dropping not float64 value", key, val)
delete(m, key)
}
}
return m
}

func convertor(jsons []byte) {
var m map[string]interface{}
err := json.Unmarshal(jsons, &m)
if err != nil {
panic(err)
}
job_name, _ := m["job-name"].(string)
partition, _ := m["partition"].(float64)
delete(m, "metricts")
delete(m, "job-name")
delete(m, "partition")
prometheusMetrics = append(prometheusMetrics, metrics{job_name, partition, metricsValidator(m)})
}

/*
metrics reference
samza_metrics_asset_enrichment {"partition": 1, "consumer-lag" : 2, "failed_message_count": 2}
*/
func consume(cg *consumergroup.ConsumerGroup) {
for {
select {
case message := <-cg.Messages():
convertor(message.Value)
err := cg.CommitUpto(message)
if err != nil {
fmt.Println("Error commit zookeeper: ", err.Error())
}
}
}
}
70 changes: 70 additions & 0 deletions kafka-topic-read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"log"
"os"
"time"
)

const (
zookeeperConn = "0.0.0.0:2181"
cgroup = "Test"
topic = "sunbirddev.analytics_metrics"
)

func main() {
// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)

// init consumer
cg, err := initConsumer()
if err != nil {
fmt.Println("Error consumer goup: ", err.Error())
os.Exit(1)
}
defer cg.Close()

// run consumer
consume(cg)
}

func initConsumer() (*consumergroup.ConsumerGroup, error) {
// consumer config
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 10 * time.Second

// join to consumer group
cg, err := consumergroup.JoinConsumerGroup(cgroup, []string{topic}, []string{zookeeperConn}, config)
if err != nil {
return nil, err
}

return cg, err
}

func consume(cg *consumergroup.ConsumerGroup) {
for {
select {
case msg := <-cg.Messages():
// messages coming through chanel
// only take messages from subscribed topic
if msg.Topic != topic {
continue
}

fmt.Println("Topic: ", msg.Topic)
fmt.Println("Value: ", string(msg.Value))

// commit to zookeeper that message is read
// this prevent read message multiple times after restart
err := cg.CommitUpto(msg)
if err != nil {
fmt.Println("Error commit zookeeper: ", err.Error())
}
}
}
}
87 changes: 87 additions & 0 deletions kafka1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package main

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"log"
"os"
"time"
)

const (
zookeeperConn = "11.2.1.15:2181"
cgroup = "Test1"
topic1 = "sunbirddev.analytics_metrics"
topic2 = "sunbirddev.pipeline_metrics"
)

func main() {
// setup sarama log to stdout
sarama.Logger = log.New(os.Stdout, "", log.Ltime)
// fmt.Println(sarama.Logger)

// init consumer
cg, err := initConsumer()
if err != nil {
fmt.Println("Error consumer goup: ", err.Error())
os.Exit(1)
}
// fmt.Println(cg)
defer cg.Close()

// run consumer
consume(cg)
}

func initConsumer() (*consumergroup.ConsumerGroup, error) {
// consumer config
config := consumergroup.NewConfig()
/*
printing config var
fmt.Println("Printing config var")
fmt.Println(config)
fmt.Println("printing sarama offsetoldest")
fmt.Println(sarama.OffsetOldest)
*/
config.Offsets.Initial = sarama.OffsetOldest
/*
fmt.Println("printing config.Offsers.initial")
fmt.Println(config.Offsets.Initial)
*/
config.Offsets.ProcessingTimeout = 10 * time.Second

// join to consumer group
cg, err := consumergroup.JoinConsumerGroup(cgroup, []string{topic1, topic2}, []string{zookeeperConn}, config)
if err != nil {
return nil, err
}

return cg, err
}

func consume(cg *consumergroup.ConsumerGroup) {
for {
select {
case msg := <-cg.Messages():
// messages coming through chanel
// only take messages from subscribed topic
// if msg.Topic != topic1 {
// continue
// }
/*
fmt.Println("Printinf msg var")
fmt.Println(msg)
*/
fmt.Println("Topic: ", msg.Topic)
fmt.Println("Value: ", string(msg.Value))

// commit to zookeeper that message is read
// this prevent read message multiple times after restart
err := cg.CommitUpto(msg)
if err != nil {
fmt.Println("Error commit zookeeper: ", err.Error())
}
}
}
}
Loading