forked from CloudKarafka/cloudkarafka-manager
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.go
142 lines (131 loc) · 3.98 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"time"
"github.com/cloudkarafka/cloudkarafka-manager/config"
"github.com/cloudkarafka/cloudkarafka-manager/db"
"github.com/cloudkarafka/cloudkarafka-manager/metrics"
"github.com/cloudkarafka/cloudkarafka-manager/server"
"github.com/cloudkarafka/cloudkarafka-manager/zookeeper"
)
var (
port = flag.String("port", "8080", "Port to run HTTP server on")
auth = flag.String("authentication", "scram", "Valid values are (none|none-with-write|scram)")
retention = flag.Int("retention", 12, "Retention period (in hours) for historic data, set to 0 to disable history")
requestTimeout = flag.Int("request-timeout", 500, "Timeout in ms for requests to brokers to fetch metrics")
printJMXQueries = flag.Bool("print-jmx-queries", false, "Print all JMX requests to the broker")
zk = flag.String("zookeeper", "localhost:2181", "The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.")
kafkaDir = flag.String("kafkadir", "/opt/kafka", "The directory where kafka lives")
)
// TODO: Handle brokers going offline.....
func getBrokerUrls() (map[int]config.HostPort, error) {
res := make(map[int]config.HostPort)
brokers, err := zookeeper.Brokers()
if err != nil {
return res, err
}
for _, id := range brokers {
broker, err := zookeeper.Broker(id)
if err != nil {
return res, err
}
res[id] = config.HostPort{Host: broker.Host, Port: broker.Port}
}
return res, nil
}
func watchBrokers() {
data, _, events, _ := zookeeper.WatchChildren("/brokers/ids")
var (
current = len(config.BrokerUrls)
new = len(data)
res = make(map[int]config.HostPort)
)
var ids []int
if new > current {
for _, id := range data {
intId, _ := strconv.Atoi(id)
ids = append(ids, intId)
}
} else {
for id, _ := range config.BrokerUrls {
ids = append(ids, id)
}
}
fmt.Fprintf(os.Stderr, "[INFO] Number of brokers changed: previous=%d, now=%d\n", current, new)
for _, id := range ids {
broker, err := zookeeper.Broker(id)
if err != nil {
broker = zookeeper.B{Host: "", Port: -1}
}
res[id] = config.HostPort{Host: broker.Host, Port: broker.Port}
}
fmt.Fprintf(os.Stderr, "[INFO] Using brokers: %v\n", res)
config.BrokerUrls = res
_, ok := <-events
if ok {
watchBrokers()
}
}
func main() {
flag.Parse()
signals := make(chan os.Signal, 1)
quit := make(chan bool, 0)
signal.Notify(signals, os.Interrupt)
config.Retention = int64(*retention)
config.Port = *port
config.AuthType = *auth
config.JMXRequestTimeout = time.Duration(*requestTimeout) * time.Millisecond
config.KafkaDir = *kafkaDir
config.ZookeeperURL = strings.Split(*zk, ",")
config.PrintConfig()
zookeeper.Connect(config.ZookeeperURL)
zookeeper.SetAuthentication(*auth)
metrics.TimeRequests = *printJMXQueries
go watchBrokers()
if err := db.Connect(); err != nil {
log.Fatalf("[ERROR] Could not connect to DB: %s\n", err)
os.Exit(1)
}
hourly := time.NewTicker(time.Hour)
metricsTicker := time.NewTicker(60 * time.Second)
defer hourly.Stop()
defer metricsTicker.Stop()
go func() {
for {
select {
case <-quit:
return
case <-metricsTicker.C:
if config.Retention > 0 {
metrics.FetchAndStoreMetrics(metrics.TopicBeans, func(v metrics.Metric) string {
return fmt.Sprintf("topic_metrics/%s/%s/%d", v.Topic, v.Name, v.Broker)
})
metrics.FetchAndStoreMetrics(metrics.BrokerBeans, func(v metrics.Metric) string {
return fmt.Sprintf("broker_metrics/%d/%s", v.Broker, v.Name)
})
}
case <-hourly.C:
db.Cleaner(time.Now().Add(time.Hour * time.Duration(config.Retention) * -1))
}
}
}()
// HTTP server
go server.Start()
//Wait for term
<-signals
fmt.Println("[INFO] Closing down...")
quit <- true
time.AfterFunc(2*time.Second, func() {
log.Fatal("[ERROR] could not exit in reasonable time")
})
zookeeper.Stop()
db.Close()
fmt.Println("[INFO] Stopped successfully")
return
}