-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexporter.go
146 lines (131 loc) · 3.55 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
"github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/ebpf-profiler/libpf"
)
type Producer interface {
Produce(pmetric.MetricSlice) error
}
type ProducerConfig struct {
Producer Producer
ScopeName string
}
type stream struct {
endpoint arrowpb.ArrowMetricsService_ArrowMetricsClient
arrowProducer *arrow_record.Producer
}
type Exporter struct {
client arrowpb.ArrowMetricsServiceClient
// NB: someday we might want to have several producer groups,
// each of which collects at different intervals.
// For now we are only collecting one scope (GPU metrics)
// so the global interval is fine.
interval time.Duration
producers []ProducerConfig
resourceAttrs map[string]any
stream *stream
}
func NewExporter(client arrowpb.ArrowMetricsServiceClient, interval time.Duration, resourceAttrs map[string]any) Exporter {
return Exporter{
client: client,
interval: interval,
resourceAttrs: resourceAttrs,
stream: nil,
}
}
func (e *Exporter) AddProducer(p ProducerConfig) {
e.producers = append(e.producers, p)
}
func (e *Exporter) makeStream(ctx context.Context) error {
slog.Debug("making new stream")
endpoint, err := e.client.ArrowMetrics(ctx)
if err != nil {
return err
}
p := arrow_record.NewProducer()
e.stream = &stream{
endpoint: endpoint,
arrowProducer: p,
}
return nil
}
func (e *Exporter) report(ctx context.Context) error {
m := pmetric.NewMetrics()
r := m.ResourceMetrics().AppendEmpty()
if err := r.Resource().Attributes().FromRaw(e.resourceAttrs); err != nil {
return err
}
for _, p := range e.producers {
slog.Debug("Running arrow metrics producer", "scope", p.ScopeName)
s := r.ScopeMetrics().AppendEmpty()
s.Scope().SetName(p.ScopeName)
ms := s.Metrics()
if err := p.Producer.Produce(ms); err != nil {
slog.Warn("Producer failed to produce metrics", "scope", p.ScopeName, "error", err)
}
}
dpc := m.DataPointCount()
slog.Debug("About to report arrow metrics", "data points", dpc)
retriesRemaining := 1
var err error
var arrow *arrowpb.BatchArrowRecords
for retriesRemaining >= 0 {
retriesRemaining -= 1
if e.stream == nil {
err = e.makeStream(ctx)
if err != nil {
// if we failed to create a new stream, don't retry.
// The point of the retry loop is to handle the stream
// legitimately going away e.g. due to the server
// having specified max_connection_age,
// not unexpected issues creating a new stream.
break
}
}
arrow, err = e.stream.arrowProducer.BatchArrowRecordsFromMetrics(m)
if err != nil {
slog.Warn("Error on produce", "error", err)
e.stream = nil
continue
}
err = e.stream.endpoint.Send(arrow)
if err != nil {
slog.Warn("Error on send", "error", err)
e.stream = nil
continue
} else {
slog.Warn("Send succeeded", "data points", dpc)
}
break
}
if err != nil {
return err
}
return nil
}
func (e *Exporter) Start(ctx context.Context) error {
slog.Info("running arrow metrics exporter", "producers", len(e.producers))
if len(e.producers) == 0 {
return errors.New("No producers configured")
}
tick := time.NewTicker(e.interval)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-tick.C:
if err := e.report(ctx); err != nil {
fmt.Errorf("Failed to send arrow metrics: %v", err)
}
tick.Reset(libpf.AddJitter(e.interval, 0.2))
}
}
}