-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.go
195 lines (175 loc) · 6.52 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package main
import (
"flag"
"io"
"math/rand"
"os"
"time"
glogger "github.com/google/logger"
"inspector/config"
"inspector/metrics"
"inspector/mylogger"
"inspector/probers"
"inspector/scheduler"
"inspector/watcher"
)
var METRIC_CHANNEL_POLL_INTERVAL = 10 * time.Second
var TARGET_LIST_SCAN_WAIT_INTERVAL = 4 * time.Second
var PROBER_RESTART_INTERVAL_JITTER_RANGE = 2
var METRIC_CHANNEL_SIZE = 400
func main() {
var configPath = flag.String("config_path", "", "Path to the configuration file. Mandatory argument.")
var logFilePath = flag.String("log_path", "", "A file where to write logs. Optional argument, defaults to stdout")
flag.Parse()
if *logFilePath != "" {
logFile, err := os.OpenFile(*logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0660)
if err != nil {
glogger.Fatalf("Failed to open log file path: %s, error: %s", *logFilePath, err)
}
defer logFile.Close()
mylogger.MainLogger = glogger.Init("InspectorLogger", false, true, logFile)
} else {
mylogger.MainLogger = glogger.Init("InspectorLogger", true, true, io.Discard)
}
if *configPath == "" {
mylogger.MainLogger.Errorf("Missing a mandatory argument: config_path. Try -help option for the list of " +
"supported arguments")
os.Exit(1)
}
c, err := config.NewConfig(*configPath)
if err != nil {
mylogger.MainLogger.Infof("Error reading config: %s", err)
os.Exit(1)
}
mylogger.MainLogger.Infof("Config parsed: %v", c.TimeSeriesDB[0])
//TODO: enable support for multiple time series databases. For now only the first one is used from config.
mdb, err := metrics.NewMetricsDB(c.TimeSeriesDB[0])
if err != nil {
mylogger.MainLogger.Infof("Failed initializing metrics db client with error: %s", err)
os.Exit(1)
}
mylogger.MainLogger.Infof("Initialized metrics database...")
// Tracking the config to be able to inform the inspector about changes, this makes the inspector self-updating while running.
configEventChannel := make(chan string)
go func() {
err := watcher.WatchFile(*configPath, configEventChannel)
if err != nil {
mylogger.MainLogger.Errorf("Error watching file: %s", err)
return
}
}()
// TODO: determine what should the size of the channel be ?
metricsChannel := make(chan metrics.SingleMetric, METRIC_CHANNEL_SIZE)
/*
* This is a scheduler configuration that will execute tasks at a specified time,
* Multiple schedulers can be created for different tasks with varying time schedules.
*/
schedule := scheduler.ScheduleOptions{
TimeZone: "Europe/Istanbul",
Schedule: "daily",
Time: "19:03",
}
go func() {
err := scheduler.ScheduleTask(schedule, func() {
mylogger.MainLogger.Infof("Scheduled task executed. Scheduled Time: %s %s", schedule.Time, schedule.TimeZone)
// do something like notifications or aggregated data collecting
})
if err != nil {
mylogger.MainLogger.Errorf("Scheduling error: %s", err)
}
}()
/*
* Kick off an async metrics collection from the metrics channel. Metrics are pushed into the metrics channel
* by probers. Collected metrics are pushed out to the currently configured metrics database.
*/
go func() {
ticker := time.NewTicker(METRIC_CHANNEL_POLL_INTERVAL)
defer ticker.Stop()
for {
select {
case m := <-metricsChannel:
m.Tags["region"] = c.Inspector.Region
mdb.CollectMetrics(m)
case <-ticker.C:
mylogger.MainLogger.Infof("Metrics channel is empty. Emitting metrics...")
mdb.EmitMultiple()
}
}
}()
/*
* Iterate over every target defined in the config, and per each target asynchronously initialize and run configured
* probers. Each prober will inject metrics into the metrics channel. The probers are expected to be implemented
* using the Prober interface.
* Probers are not reused and run only once per iteration.
*
* TODO: as a further optimization, the targets can be partitioned and processed asynchronously. This will help
* if the number of targets become extremely large, but for now it's not a priority.
*/
for {
// Monitor configEventChannel to know about config changes. For current state of Inspector we interested only in "Write" event.
select {
case event := <-configEventChannel:
mylogger.MainLogger.Infof("Config event: %s", event)
c, err = config.NewConfig(*configPath)
if err != nil {
mylogger.MainLogger.Infof("Error reading config: %s", err)
os.Exit(1)
}
mylogger.MainLogger.Infof("Config parsed: %v", c.TimeSeriesDB[0])
mdb, err = metrics.NewMetricsDB(c.TimeSeriesDB[0])
if err != nil {
mylogger.MainLogger.Infof("Failed initializing metrics db client with error: %s", err)
os.Exit(1)
}
mylogger.MainLogger.Infof("Initialized metrics database...")
default:
mylogger.MainLogger.Infof("No config event.")
}
for _, target := range c.Targets {
for _, proberSubConfig := range target.Probers {
go func() {
prober, err := probers.NewProber(proberSubConfig)
if err != nil {
mylogger.MainLogger.Errorf("Failed creating new prober: %s for target: %s, error: %s",
proberSubConfig.Name, target.Name, err)
return
}
err = prober.Initialize(target.Id, proberSubConfig.Id)
if err != nil {
mylogger.MainLogger.Errorf("Failed initializing prober: %s for target: %s, error: %s",
proberSubConfig.Name, target.Name, err)
return
}
mylogger.MainLogger.Infof("Successfully initialized prober: %s for target: %s",
proberSubConfig.Name, target.Name)
err = prober.Connect(metricsChannel)
if err != nil {
mylogger.MainLogger.Errorf("Failed prober connection: %s for target: %s, error: %s",
proberSubConfig.Name, target.Name, err)
return
}
mylogger.MainLogger.Infof("Successful prober connection: %s for target: %s",
proberSubConfig.Name, target.Name)
err = prober.RunOnce(metricsChannel)
if err != nil {
mylogger.MainLogger.Errorf("Failed running prober: %s for target: %s, error: %s",
proberSubConfig.Name, target.Name, err)
return
}
err = prober.TearDown()
if err != nil {
mylogger.MainLogger.Errorf("Failed tearing down prober: %s for target: %s, error: %s",
proberSubConfig.Name, target.Name, err)
return
}
mylogger.MainLogger.Infof("Successfully torn down prober: %s for target: %s",
proberSubConfig.Name, target.Name)
}()
jitter := rand.Intn(PROBER_RESTART_INTERVAL_JITTER_RANGE)
time.Sleep(time.Duration(jitter) * time.Second)
}
}
// Wait before scanning through the targets from scratch
time.Sleep(TARGET_LIST_SCAN_WAIT_INTERVAL)
}
}