-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcloudwatch.go
159 lines (143 loc) · 4.17 KB
/
cloudwatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package main
import (
"fmt"
"time"
log "github.com/Sirupsen/logrus"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)
/*
AWS CloudWatch specific constants.
Also see http://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/cloudwatch_limits_cwl.html
*/
const (
// Maximum number of log events in a batch.
maxBatchEvents = 10000
// Maximum batch size in bytes.
maxBatchSize = 1048576
// Maximum event size in bytes.
maxEventSize = 262144
// A batch of log events in a single PutLogEvents request cannot span more than 24 hours.
maxBatchTimeSpan = 86400000
// How many bytes to append to each log event.
eventSizeOverhead = 26
// DescribeLogStreams transactions/second.
describeLogstreamsDelay = 200 * time.Millisecond
// PutLogEvents 5 requests/second/log stream.
putLogEventsDelay = 200 * time.Millisecond
)
type logEvent struct {
msg string
// Timestamp in milliseconds
timestamp int64
}
func (e *logEvent) size() int {
return len(e.msg) + eventSizeOverhead
}
func (e *logEvent) validate() error {
if e.size() > maxEventSize {
return errMessageTooBig
}
return nil
}
type destination struct {
stream string
group string
token *string
svc *cloudwatchlogs.CloudWatchLogs
}
func newDestination(stream, group string) *destination {
dst := &destination{
svc: cwlogs,
stream: stream,
group: group,
}
log.Debugf("%s setting token", dst)
dst.setToken()
return dst
}
// Put log events and update sequence token.
// Possible errors http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
func (dst *destination) upload(events eventsList) error {
logevents := make([]*cloudwatchlogs.InputLogEvent, 0, len(events))
for _, elem := range events {
logevents = append(logevents, &cloudwatchlogs.InputLogEvent{
Message: aws.String(elem.msg),
Timestamp: aws.Int64(elem.timestamp),
})
}
params := &cloudwatchlogs.PutLogEventsInput{
LogEvents: logevents,
LogGroupName: aws.String(dst.group),
LogStreamName: aws.String(dst.stream),
SequenceToken: dst.token,
}
// When rejectedLogEventsInfo is not empty, app can not
// do anything reasonable with rejected logs. Ignore it.
// Meybe expose some statistics for rejected counters.
resp, err := dst.svc.PutLogEvents(params)
if err == nil {
dst.token = resp.NextSequenceToken
}
return err
}
// For newly created log streams, token is an empty string.
func (dst *destination) setToken() error {
params := &cloudwatchlogs.DescribeLogStreamsInput{
LogGroupName: aws.String(dst.group),
LogStreamNamePrefix: aws.String(dst.stream),
}
return dst.svc.DescribeLogStreamsPages(params,
func(page *cloudwatchlogs.DescribeLogStreamsOutput, lastPage bool) bool {
return !findToken(dst, page)
})
}
// Create log group and stream. If an error is returned, PutLogEvents cannot succeed.
func (dst *destination) create() (err error) {
err = dst.createGroup()
if err != nil {
return
}
err = dst.createStream()
return
}
// http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogGroup.html
func (dst *destination) createGroup() error {
params := &cloudwatchlogs.CreateLogGroupInput{
LogGroupName: aws.String(dst.group),
}
_, err := dst.svc.CreateLogGroup(params)
if err, ok := err.(awserr.Error); ok {
if err.Code() == "ResourceAlreadyExistsException" {
return nil
}
}
return err
}
// http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_CreateLogStream.html
func (dst *destination) createStream() error {
params := &cloudwatchlogs.CreateLogStreamInput{
LogGroupName: aws.String(dst.group),
LogStreamName: aws.String(dst.stream),
}
_, err := dst.svc.CreateLogStream(params)
if err, ok := err.(awserr.Error); ok {
if err.Code() == "ResourceAlreadyExistsException" {
return nil
}
}
return err
}
func (dst *destination) String() string {
return fmt.Sprintf("group: %s stream: %s", dst.group, dst.stream)
}
func findToken(dst *destination, page *cloudwatchlogs.DescribeLogStreamsOutput) bool {
for _, row := range page.LogStreams {
if dst.stream == *row.LogStreamName {
dst.token = row.UploadSequenceToken
return true
}
}
return false
}