-
Notifications
You must be signed in to change notification settings - Fork 0
/
http.go
145 lines (130 loc) · 3.67 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package prommerge
import (
"fmt"
"io"
"log/slog"
"sync"
"time"
)
func (pd *PromData) AsyncHTTP() error {
t := time.Now()
defer func() {
pd.CollectTargetsDuration = time.Since(t)
slog.Debug("Targets are collected", slog.String("duration", pd.CollectTargetsDuration.String()), slog.Int("len", len(pd.PromMetrics)))
}()
httpWg, parserWg, bodyData, workerPool :=
new(sync.WaitGroup),
new(sync.WaitGroup),
make(chan *PromChanData),
make(chan struct{}, pd.workerPoolSize)
pd.PromMetrics = nil
for i, _ := range pd.PromTargets {
httpWg.Add(1)
go pd.AHTTP(httpWg, bodyData, workerPool, pd.PromTargets[i])
}
go func() {
slog.Debug("Waiting HTTP routines")
httpWg.Wait()
slog.Debug("Close bodyData channel")
close(bodyData)
slog.Debug("HTTP routines are done")
}()
defer func() {
close(pd.PromMetricsStream)
<-pd.MergeWorkerDoneHook
slog.Debug("Release lock")
}()
go pd.MetricsMergeWorker()
for {
select {
case promData, ok := <-bodyData:
if !ok {
slog.Debug("bodyData is closed", slog.Int("len(bodyData)", len(bodyData)), slog.Int("len(PromMetricsStream)", len(pd.PromMetricsStream)))
tM := time.Now()
parserWg.Wait()
slog.Debug("Merge routine completed", slog.String("duration", time.Since(tM).String()))
return nil
}
if promData == nil {
if !pd.SupressErrors {
slog.Warn("Empty promData")
}
continue
}
if promData.Err != nil && pd.EmptyOnFailure {
slog.Debug("Return empty result")
pd.PromMetrics = nil
return fmt.Errorf("failed make async http request, %v", promData.Err)
}
if promData.Err != nil && !pd.EmptyOnFailure {
if !pd.SupressErrors {
slog.Warn("Skip promData error")
slog.Error("Failed make async http request", slog.String("err", promData.Err.Error()))
}
continue
}
// Send metric to merge worker
parserWg.Add(1)
go pd.RouteMetric(parserWg, promData)
}
}
}
func (pd *PromData) RouteMetric(wg *sync.WaitGroup, promData *PromChanData) {
defer func() {
wg.Done()
}()
metrics := pd.ParseMetricData(promData.Data, promData.ExtraLabels)
if metrics != nil {
pd.PromMetricsStream <- metrics
}
}
func (pd *PromData) MetricsMergeWorker() {
for {
select {
case metrics, ok := <-pd.PromMetricsStream:
if !ok {
slog.Debug("Metrics stream is closed, all messages should be processed", slog.Int("len(PromMetricsStream)", len(pd.PromMetricsStream)))
pd.MergeWorkerDoneHook <- struct{}{}
return
}
pd.PromMetrics = append(pd.PromMetrics, metrics...)
}
}
}
func (pd *PromData) AHTTP(wg *sync.WaitGroup, bodyData chan *PromChanData, workerPool chan struct{}, target PromTarget) {
defer wg.Done()
defer func() {
slog.Debug("Release worker")
<-workerPool
}()
t := time.Now()
slog.Debug("Acquire worker")
workerPool <- struct{}{}
slog.Debug("Get endpoint", slog.String("url", target.Url))
response, err := pd.httpClient.Get(target.Url)
if err != nil {
bodyData <- &PromChanData{Err: fmt.Errorf("http get error for %s: %v", target.Url, err)}
return
}
if response.StatusCode > 299 {
bodyData <- &PromChanData{Err: fmt.Errorf("http get failed for %s, response code expected 200, actual %v", target.Url, response.StatusCode)}
return
}
defer func() {
err = response.Body.Close()
if err != nil {
slog.Error("Failed to close http request body", slog.String("err", err.Error()))
}
}()
body, err := io.ReadAll(response.Body)
if err != nil {
bodyData <- &PromChanData{Err: fmt.Errorf("error reading data from %s: %v", target.Url, err)}
return
}
bodyData <- &PromChanData{
Data: string(body),
ExtraLabels: target.ExtraLabels,
}
slog.Debug("Async http executed", slog.String("duration", time.Since(t).String()))
return
}