forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.go
300 lines (251 loc) · 7.04 KB
/
graphite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package influxdb
import (
"bufio"
"errors"
"io"
"net"
"strings"
"sync"
log "code.google.com/p/log4go"
)
var (
// ErrBindAddressRequired is returned when starting the GraphiteServer
// without a TCP or UDP listening address.
ErrBindAddressRequired = errors.New("bind address required")
// ErrGraphiteServerClosed return when closing an already closed graphite server.
ErrGraphiteServerClosed = errors.New("graphite server already closed")
)
// GraphiteListener provides a tcp and/or udp listener that you can
// use to ingest metrics into influxdb via the graphite protocol. it
// behaves as a carbon daemon, except:
//
// no rounding of timestamps to the nearest interval. Upon ingestion
// of multiple datapoints for a given key within the same interval
// (possibly but not necessarily the same timestamp), graphite would
// use one (the latest received) value with a rounded timestamp
// representing that interval. We store values for every timestamp we
// receive (only the latest value for a given metric-timestamp pair)
// so it's up to the user to feed the data in proper intervals (and
// use round intervals if you plan to rely on that)
type GraphiteServer struct {
server *Server
mu sync.Mutex
wg sync.WaitGroup
done chan struct{} // close notification
// The TCP address to listen on.
TCPAddr *net.TCPAddr
// The UDP address to listen on.
UDPAddr *net.UDPAddr
// The name of the database to insert data into.
Database string
// The cluster admin authorized to insert the data.
User *User
}
// NewGraphiteServer returns an instance of GraphiteServer attached to a Server.
func NewGraphiteServer(server *Server) *GraphiteServer {
return &GraphiteServer{server: server}
}
// ListenAndServe opens TCP (and optionally a UDP) socket to listen for messages.
func (s *GraphiteServer) ListenAndServe() error {
// Make sure we have either a TCP address or a UDP address.
// Also validate that there is an admin user to insert data as.
if s.TCPAddr == nil && s.UDPAddr == nil {
return ErrBindAddressRequired
} else if s.User != nil {
return ErrUserNotFound
}
// Create a new close notification channel.
done := make(chan struct{}, 0)
s.done = done
// Open the TCP connection.
if s.TCPAddr != nil {
l, err := net.ListenTCP("tcp", s.TCPAddr)
if err != nil {
return err
}
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveTCP(l, done)
}
// Open the UDP connection.
if s.UDPAddr != nil {
l, err := net.ListenUDP("udp", s.UDPAddr)
if err != nil {
return err
}
defer func() { _ = l.Close() }()
s.wg.Add(1)
go s.serveUDP(l, done)
}
return nil
}
// serveTCP handles incoming TCP connection requests.
func (s *GraphiteServer) serveTCP(l *net.TCPListener, done chan struct{}) {
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
l.Close()
}()
// Listen for new TCP connections.
for {
c, err := l.Accept()
if err != nil {
// TODO(benbjohnson): Check for connection closed.
log.Error("GraphiteServer: Accept: ", err)
continue
}
s.wg.Add(1)
go s.handleTCPConn(c)
}
}
func (s *GraphiteServer) handleTCPConn(conn net.Conn) {
defer conn.Close()
defer s.wg.Done()
reader := bufio.NewReader(conn)
for {
err := s.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("GraphiteServer: Client closed graphite connection")
return
}
log.Error("GraphiteServer:", err)
}
}
}
// serveUDP handles incoming UDP messages.
func (s *GraphiteServer) serveUDP(conn *net.UDPConn, done chan struct{}) {
defer s.wg.Done()
// Listen for server close.
go func() {
<-done
conn.Close()
}()
buf := make([]byte, 65536)
for {
// Read from connection.
n, _, err := conn.ReadFromUDP(buf)
if err == io.EOF {
return
} else if err != nil {
log.Warn("GraphiteServer: Error when reading from UDP connection %s", err.Error())
}
// Read in data in a separate goroutine.
s.wg.Add(1)
go s.handleUDPMessage(string(buf[:n]))
}
}
// handleUDPMessage splits a UDP packet by newlines and processes each message.
func (s *GraphiteServer) handleUDPMessage(msg string) {
defer s.wg.Done()
for _, metric := range strings.Split(msg, "\n") {
s.handleMessage(bufio.NewReader(strings.NewReader(metric + "\n")))
}
}
// Close shuts down the server's listeners.
func (s *GraphiteServer) Close() error {
// Notify other goroutines of shutdown.
s.mu.Lock()
if s.done == nil {
s.mu.Unlock()
return ErrGraphiteServerClosed
}
close(s.done)
s.done = nil
s.mu.Unlock()
// Wait for all goroutines to shutdown.
s.wg.Wait()
return nil
}
// handleMessage decodes a graphite message from the reader and sends it to the
// committer goroutine.
func (s *GraphiteServer) handleMessage(r *bufio.Reader) error {
panic("not yet implemented: GraphiteServer.handleMessage()")
/* TEMPORARILY REMOVED FOR PROTOBUFS.
// Decode graphic metric.
m, err := decodeGraphiteMetric(r)
if err != nil {
return err
}
// Convert metric to a field value.
v := &protocol.FieldValue{}
if m.isInt {
v.Int64Value = &m.integerValue
} else {
v.DoubleValue = &m.floatValue
}
// Use a single sequence number to make sure last write wins.
sn := uint64(1)
// Send data point to committer.
p := &protocol.Point{
Timestamp: &m.timestamp,
Values: []*protocol.FieldValue{v},
SequenceNumber: &sn,
}
// Write data to server.
series := &protocol.Series{
Name: &m.name,
Fields: []string{"value"},
Points: []*protocol.Point{p},
}
// TODO: Validate user.
// Look up database.
db := s.server.Database(s.Database)
if db == nil {
return ErrDatabaseNotFound
}
// Write series data to database.
if err := db.WriteSeries(series); err != nil {
return fmt.Errorf("write series data: %s", err)
}
return nil
}
type graphiteMetric struct {
name string
isInt bool
integerValue int64
floatValue float64
timestamp int64
}
// returns err == io.EOF when we hit EOF without any further data
func decodeGraphiteMetric(r *bufio.Reader) (*graphiteMetric, error) {
// Read up to the next newline.
buf, err := r.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return nil, fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
if str == "" {
return nil, err
}
// else we got EOF but also data, so just try to process it as valid data
}
// Break into 3 fields (name, value, timestamp).
fields := strings.Fields(str)
if len(fields) != 3 {
return nil, fmt.Errorf("received '%s' which doesn't have three fields", str)
}
// Create a metric.
m := &graphiteMetric{name: fields[0]}
// Parse value.
v, err := strconv.ParseFloat(fields[1], 64)
if err != nil {
return nil, err
}
// Determine if value is a float or an int.
if i := int64(v); float64(i) == v {
m.integerValue, m.isInt = int64(v), true
} else {
m.floatValue = v
}
// Parse timestamp.
timestamp, err := strconv.ParseUint(fields[2], 10, 32)
if err != nil {
return nil, err
}
m.timestamp = int64(timestamp) * int64(time.Millisecond)
return m, nil
*/
}