forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
parser.go
96 lines (82 loc) · 2.39 KB
/
parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package prometheus
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/parsers"
)
func AcceptsContent(header http.Header) bool {
return expfmt.ResponseFormat(header).FormatType() != expfmt.TypeUnknown
}
type Parser struct {
IgnoreTimestamp bool `toml:"prometheus_ignore_timestamp"`
MetricVersion int `toml:"prometheus_metric_version"`
Header http.Header `toml:"-"` // set by the prometheus input
DefaultTags map[string]string `toml:"-"`
Log telegraf.Logger `toml:"-"`
}
func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) {
// Determine the metric transport-type derived from the response header and
// create a matching decoder.
format := expfmt.ResponseFormat(p.Header)
switch format.FormatType() {
case expfmt.TypeProtoText:
// Make sure we have a finishing newline but no trailing one
data = bytes.TrimPrefix(data, []byte("\n"))
if !bytes.HasSuffix(data, []byte("\n")) {
data = append(data, []byte("\n")...)
}
case expfmt.TypeUnknown:
p.Log.Debugf("Unknown format %q... Trying to continue...", p.Header.Get("Content-Type"))
}
buf := bytes.NewBuffer(data)
decoder := expfmt.NewDecoder(buf, format)
// Decode the input data into prometheus metrics
var metrics []telegraf.Metric
for {
var mf dto.MetricFamily
if err := decoder.Decode(&mf); err != nil {
if errors.Is(err, io.EOF) {
break
}
return nil, fmt.Errorf("decoding response failed: %w", err)
}
switch p.MetricVersion {
case 0, 2:
metrics = append(metrics, p.extractMetricsV2(&mf)...)
case 1:
metrics = append(metrics, p.extractMetricsV1(&mf)...)
default:
return nil, fmt.Errorf("unknown prometheus metric version %d", p.MetricVersion)
}
}
return metrics, nil
}
func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}
if len(metrics) < 1 {
return nil, errors.New("no metrics in line")
}
if len(metrics) > 1 {
return nil, errors.New("more than one metric in line")
}
return metrics[0], nil
}
func init() {
parsers.Add("prometheus",
func(string) telegraf.Parser {
return &Parser{}
},
)
}