This repository has been archived by the owner on Jun 2, 2020. It is now read-only.
forked from SDFE/sqs2alertmanager
-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.go
354 lines (271 loc) · 10.1 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
// sqs2alertmanager app pulls messages from SQS and posts them to Prometheus Alertmanager as a "Critical" alert
package main
import (
"bytes"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"regexp"
"time"
"github.com/SDFE/sqs2alertmanager/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/jpillora/backoff"
"github.com/pingles/go-metrics-riemann"
"github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics/exp"
)
var (
r = metrics.NewRegistry() // metric registry
rhc = metrics.NewRegistry() // healthcheck registry
)
// sendMessage sends a formatted JSON document to the Alertmanager
func sendMessage(alertmanagerURL string, b []byte, ch chan *sqs.Message, metricPrefix string, msg *sqs.Message) {
cERR := metrics.GetOrRegisterCounter(metricPrefix+".alertmanager_err", r)
cOK := metrics.GetOrRegisterCounter(metricPrefix+".alertmanager_ok", r)
response, err := http.Post(alertmanagerURL+"/api/v1/alerts", "application/json", bytes.NewBuffer(b))
if err != nil {
// ch <- fmt.Sprintf("error: http.Post to %s failed, %v", alertmanagerURL, err)
cERR.Inc(1)
close(ch)
return
}
defer response.Body.Close()
if response.StatusCode < 400 {
ch <- msg
cOK.Inc(1)
// creates a metric for alerts sent to alertmanager per service
go func() {
var alertJSONS []*types.AlertmanagerAlert
err := json.Unmarshal(b, &alertJSONS)
if err != nil {
log.Println("error: ", err)
}
c := metrics.GetOrRegisterCounter(metricPrefix+".alerts."+alertJSONS[0].Labels.Service, r)
c.Inc(1)
}()
}
close(ch)
}
// delSqsMessages deletes messages coming in from a *sqs.Message channel
// messages on that channel are already processed and sent off to prometheus-alertmanager, so it is
// save to delete them
func delSqsMessages(ch chan *sqs.Message, queueURL *string, awsEndpoint *string, awsRegion *string, metricPrefix *string) {
cERRdel := metrics.GetOrRegisterCounter(*metricPrefix+".sqs_del_error", r)
cOKdel := metrics.GetOrRegisterCounter(*metricPrefix+".sqs_del_ok", r)
var sess *session.Session
var err error
// TODO: possibly put this somewhere so we only need to have 1 session, rather than multiples, but it works for now
if *awsEndpoint != "" {
sess, err = session.NewSession(
&aws.Config{
Endpoint: aws.String(*awsEndpoint),
DisableSSL: aws.Bool(true),
Region: aws.String(*awsRegion),
})
} else {
sess, err = session.NewSession(&aws.Config{})
}
if err != nil {
log.Fatalln("error: unable to create session for deletes: ", *queueURL)
}
svc := sqs.New(sess)
for msg := range ch {
delMsg := &sqs.DeleteMessageInput{QueueUrl: queueURL, ReceiptHandle: msg.ReceiptHandle}
_, err := svc.DeleteMessage(delMsg)
if err != nil {
cERRdel.Inc(1)
log.Println("error: ", err)
}
cOKdel.Inc(1)
log.Println("info: deleted", *msg.MessageId)
}
}
// rcvSqsMessages polls SQS for new messages and retrieves up to 10 messages at a time
func rcvSqsMessages(sqsURL *string, awsEndpoint *string, awsRegion *string, backOff *backoff.Backoff, sqsMsgChan chan *sqs.Message, metricPrefix string) {
cERR := metrics.GetOrRegisterCounter(metricPrefix+".sqs_rcv_error", r)
cOK := metrics.GetOrRegisterCounter(metricPrefix+".sqs_rcv_ok", r)
var sess *session.Session
var err error
sqsParams := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(*sqsURL),
WaitTimeSeconds: aws.Int64(10), // use long-polling to save money, 10s is max
VisibilityTimeout: aws.Int64(120),
MaxNumberOfMessages: aws.Int64(10),
}
// TODO: dedupe this as we have it again in deSqsMessages
if *awsEndpoint != "" {
sess, err = session.NewSession(
&aws.Config{
Endpoint: aws.String(*awsEndpoint),
DisableSSL: aws.Bool(true),
Region: aws.String(*awsRegion),
})
} else {
sess, err = session.NewSession(&aws.Config{})
}
if err != nil {
log.Fatalln("error: unable to create session: ", *sqsURL)
}
// ------------- TODO END --------------
svc := sqs.New(sess)
for {
rec, err := svc.ReceiveMessage(sqsParams)
if err != nil {
cERR.Inc(1)
log.Println("error: ", err)
time.Sleep(backOff.Duration()) // sleep with expo backoff & jitter
continue
}
for _, msg := range rec.Messages {
cOK.Inc(1)
log.Println("info: received ", *msg.MessageId)
sqsMsgChan <- msg
}
}
}
// appreadyHandler provides /admin/app-ready
func appreadyHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
}
// NewHTTPHealthCheck creates a http healthcheck to "url" with name "name"
// http.StatusCode > 499 will return a error
func NewHTTPHealthCheck(url *string, name string, timeout time.Duration) {
// set 10s timeout for healthchecks to return
var myClient = &http.Client{Timeout: timeout * time.Second}
hc := metrics.NewHealthcheck(func(f metrics.Healthcheck) {
if result, err := myClient.Get(*url); err != nil {
f.Unhealthy(err)
} else {
if result.StatusCode > 499 {
f.Unhealthy(errors.New(result.Status))
} else {
f.Healthy()
}
}
})
rhc.Register(name, hc)
}
// healthcheckHandler provides /healthcheck endpoint
func healthcheckHandler(w http.ResponseWriter, req *http.Request) {
// set the correct content type, makes it explicit +
// http server doesnt have to try and guess the type (and maybe get it wrong)
w.Header().Add("Content-Type", "application/json")
// run healthchecks
rhc.RunHealthchecks()
// write json response
enc := json.NewEncoder(w)
enc.Encode(&rhc)
}
// main function
func main() {
awsEndpoint := flag.String("endpoint", "", "the aws endpoint URL to use")
awsRegion := flag.String("region", "us-east-1", "the aws region to connect to for pulling from sqs, default: us-east-1")
sqsURL := flag.String("sqs", "http://localhost:4100/queue/alerts1", "the sqs queue url")
alertmanagerURL := flag.String("url", "http://localhost:8080", "the http(s):// url to prometheus alertmanager")
regex := flag.String("r", ``, "regex to match cloudwatch alerts against")
// metric / stats config
listenAddress := flag.String("listen-address", ":8888", "the listen address to serve /debug/metrics, /healthcheck and /admin/app-ready on")
riemannAddress := flag.String("riemann-host", "localhost:5555", "riemann ip:port")
metricPrefix := flag.String("metric-prefix", "sqs2alertmanager", "metric prefix to be used, app/servie name would be a good choice")
// metric output enable/disable
metricEnableRiemannOutput := flag.Bool("riemann", false, "send metric data to riemann")
metricEnableOutput := flag.Bool("metrics", false, "output metric data")
flag.Parse()
backOff := &backoff.Backoff{Jitter: true, Min: 5 * time.Second, Max: 5 * time.Minute}
/*
create healthchecks
*/
NewHTTPHealthCheck(alertmanagerURL, "alertmanager", 2)
NewHTTPHealthCheck(sqsURL, "aws-sqs", 2)
/*
add metrics
*/
exp.Exp(r) // expvar + go-metric metrics
if *metricEnableOutput == true {
go metrics.Log(r, 60*time.Second, log.New(os.Stderr, "", log.LstdFlags)) // log metrics every 60 seconds
}
if *metricEnableRiemannOutput == true {
go riemann.Report(r, 10*time.Second, *riemannAddress) // send metrics to riemann every 10 seconds
}
// serve
http.HandleFunc("/admin/app-ready", appreadyHandler) // seems to tag along with http.DefaultServeMux
http.HandleFunc("/healthcheck", healthcheckHandler)
go http.ListenAndServe(*listenAddress, http.DefaultServeMux) // serve /debug/metrics
/*
create sqsMsgChan channel and start putting messages on the channel with rcvSqsMessages func
*/
sqsMsgChan := make(chan *sqs.Message)
defer close(sqsMsgChan)
go rcvSqsMessages(sqsURL, awsEndpoint, awsRegion, backOff, sqsMsgChan, *metricPrefix)
/*
consume messages from sqsMsgChan
*/
for message := range sqsMsgChan {
CWAlert := types.CloudWatchAlert{}
err := json.Unmarshal([]byte(*message.Body), &CWAlert)
if err != nil {
log.Println("error: parsing sqs message: ", err)
continue
}
alertData := CWAlert.Message
CWAlertData := types.AlarmData{}
err = json.Unmarshal([]byte(alertData), &CWAlertData)
if err != nil {
log.Println("error: parsing sqs message.Message: ", err)
continue
}
promAlertAnnotations := types.Annotations{
Asg: CWAlertData.Trigger.Dimensions[0].Value,
Description: CWAlertData.AlarmDescription,
AWSAccountID: CWAlertData.AWSAccountID,
Reason: CWAlertData.NewStateReason,
Region: CWAlertData.Region,
Source: sqsURL,
}
// re, err := regexp.Compile(`alert-(?P<env>\w+)-(?P<service>\w+)-(?P<appversion>\d+\-\d+\-\d+\-\d+)-(?P<alarmname>.*)$`)
re, err := regexp.Compile(*regex)
if err != nil {
log.Fatalf("error: unable to compile regex: %v\n", err)
}
result := re.FindAllStringSubmatch(CWAlertData.AlarmName, -1)
n1 := re.SubexpNames() // keeps the names of the matches in same order as per regex given
resultMap := map[string]string{}
for i, m := range result[0] {
resultMap[n1[i]] = m
}
promAlertLabels := types.Labels{
Env: resultMap["env"],
Alertname: resultMap["alarmname"],
Region: CWAlertData.Region,
Service: resultMap["service"],
RunbookURL: resultMap["runbook"],
Severity: "Critical", // Cloudwatch has no concept of severity, so everything is critical for now
}
promAlert := types.AlertmanagerAlert{
Annotations: promAlertAnnotations,
GeneratorURL: promAlertLabels.RunbookURL,
Labels: promAlertLabels,
}
var promAlerts []types.AlertmanagerAlert
promAlerts = append(promAlerts, promAlert)
jsonMsg, err := json.MarshalIndent(&promAlerts, "", "\t") // convert promAlerts to valid JSON data
/*
send the message to alertmanager & delete sqs messages that come back from ch
*/
ch := make(chan *sqs.Message)
delCh := make(chan *sqs.Message)
defer close(delCh)
go sendMessage(*alertmanagerURL, jsonMsg, ch, *metricPrefix, message) // send JSON message to alertmanager in a go-routine
go delSqsMessages(delCh, sqsURL, awsEndpoint, awsRegion, metricPrefix) // delete messages in a go-routine
for r := range ch { // consume messages on ch channel
log.Println("info: processed", *r.MessageId) // log the processed messageId
delCh <- r // send message to delCh (delete Channel)
}
}
}