-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathselfstat_selfstat.go
160 lines (142 loc) · 4.31 KB
/
selfstat_selfstat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// selfstat is a package for tracking and collecting internal statistics
// about telegraf. Metrics can be registered using this package, and then
// incremented or set within your code. If the inputs.internal plugin is enabled,
// then all registered stats will be collected as they would by any other input
// plugin.
package main
import (
"hash/fnv"
"log"
"sort"
"sync"
"time"
)
var (
registry *rgstry
)
// Stat is an interface for dealing with telegraf statistics collected
// on itself.
type Stat interface {
// Name is the name of the measurement
Name() string
// FieldName is the name of the measurement field
FieldName() string
// Tags is a tag map. Each time this is called a new map is allocated.
Tags() map[string]string
// Key is the unique measurement+tags key of the stat.
Key() uint64
// Incr increments a regular stat by 'v'.
// in the case of a timing stat, increment adds the timing to the cache.
Incr(v int64)
// Set sets a regular stat to 'v'.
// in the case of a timing stat, set adds the timing to the cache.
Set(v int64)
// Get gets the value of the stat. In the case of timings, this returns
// an average value of all timings received since the last call to Get().
// If no timings were received, it returns the previous value.
Get() int64
}
// Register registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func Register(measurement, field string, tags map[string]string) Stat {
return registry.register(&stat{
measurement: "internal_" + measurement,
field: field,
tags: tags,
})
}
// RegisterTiming registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// Timing stats differ from regular stats in that they accumulate multiple
// "timings" added to them, and will return the average when Get() is called.
// After Get() is called, the average is cleared and the next timing returned
// from Get() will only reflect timings added since the previous call to Get().
// If Get() is called without receiving any new timings, then the previous value
// is used.
//
// In other words, timings are an averaged metric that get cleared on each call
// to Get().
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func RegisterTiming(measurement, field string, tags map[string]string) Stat {
return registry.register(&timingStat{
measurement: "internal_" + measurement,
field: field,
tags: tags,
})
}
// Metrics returns all registered stats as telegraf metrics.
func Metrics() []Metric {
registry.mu.Lock()
now := time.Now()
metrics := make([]Metric, len(registry.stats))
i := 0
for _, stats := range registry.stats {
if len(stats) > 0 {
var tags map[string]string
var name string
fields := map[string]interface{}{}
j := 0
for fieldname, stat := range stats {
if j == 0 {
tags = stat.Tags()
name = stat.Name()
}
fields[fieldname] = stat.Get()
j++
}
metric, err := New(name, tags, fields, now)
if err != nil {
log.Printf("E! Error creating selfstat metric: %s", err)
continue
}
metrics[i] = metric
i++
}
}
registry.mu.Unlock()
return metrics
}
type rgstry struct {
stats map[uint64]map[string]Stat
mu sync.Mutex
}
func (r *rgstry) register(s Stat) Stat {
r.mu.Lock()
defer r.mu.Unlock()
if stats, ok := r.stats[s.Key()]; ok {
// measurement exists
if stat, ok := stats[s.FieldName()]; ok {
// field already exists, so don't create a new one
return stat
}
r.stats[s.Key()][s.FieldName()] = s
return s
} else {
// creating a new unique metric
r.stats[s.Key()] = map[string]Stat{s.FieldName(): s}
return s
}
}
func key(measurement string, tags map[string]string) uint64 {
h := fnv.New64a()
h.Write([]byte(measurement))
tmp := make([]string, len(tags))
i := 0
for k, v := range tags {
tmp[i] = k + v
i++
}
sort.Strings(tmp)
for _, s := range tmp {
h.Write([]byte(s))
}
return h.Sum64()
}