Skip to content

Commit 29dffde

Browse files
authored
Merge pull request #68 from smallpath/master
Fix cacheInterval concurrency access
2 parents 02a4979 + b341c74 commit 29dffde

File tree

2 files changed

+199
-3
lines changed

2 files changed

+199
-3
lines changed

pkg/cachedtransactiongather/cachedtransactiongather.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,10 @@ type cachedTransactionGather struct {
4040
}
4141

4242
func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily, func(), error) {
43-
c.lock.RLock()
43+
c.lock.Lock()
4444
shouldGather := time.Now().After(c.nextCollectionTime)
45-
c.lock.RUnlock()
4645
if shouldGather {
4746
begin := time.Now()
48-
c.lock.Lock()
4947
c.nextCollectionTime = c.nextCollectionTime.Add(c.cacheInterval)
5048
metrics, done, err := c.gather.Gather()
5149
if err != nil {
@@ -60,6 +58,8 @@ func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily
6058
c.lock.Unlock()
6159
duration := time.Since(begin)
6260
level.Info(c.logger).Log("msg", "Collect all products done", "duration_seconds", duration.Seconds())
61+
} else {
62+
c.lock.Unlock()
6363
}
6464
c.lock.RLock()
6565
defer c.lock.RUnlock()
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package cachedtransactiongather
2+
3+
import (
4+
"fmt"
5+
"github.com/prometheus/client_golang/prometheus"
6+
io_prometheus_client "github.com/prometheus/client_model/go"
7+
"github.com/prometheus/common/promlog"
8+
"sort"
9+
"sync"
10+
"testing"
11+
"time"
12+
)
13+
14+
type mockGatherer struct {
15+
sleepUntil time.Duration
16+
}
17+
18+
func (m mockGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) {
19+
fmt.Println("start gather: " + m.sleepUntil.String())
20+
time.Sleep(m.sleepUntil)
21+
fmt.Println("end gather: " + m.sleepUntil.String())
22+
return []*io_prometheus_client.MetricFamily{}, nil
23+
}
24+
25+
func newMockGatherer(duration time.Duration) prometheus.Gatherer {
26+
return &mockGatherer{
27+
sleepUntil: duration,
28+
}
29+
}
30+
31+
type multiTRegistry struct {
32+
tGatherers []prometheus.TransactionalGatherer
33+
}
34+
35+
func newMultiConcurrencyRegistry(tGatherers ...prometheus.TransactionalGatherer) *multiTRegistry {
36+
return &multiTRegistry{
37+
tGatherers: tGatherers,
38+
}
39+
}
40+
41+
// Gather implements TransactionalGatherer interface.
42+
func (r *multiTRegistry) Gather() (mfs []*io_prometheus_client.MetricFamily, done func(), err error) {
43+
dFns := make([]func(), 0, len(r.tGatherers))
44+
wait := sync.WaitGroup{}
45+
wait.Add(len(r.tGatherers))
46+
for i := range r.tGatherers {
47+
go func(i int) {
48+
_, _, _ = r.tGatherers[i].Gather()
49+
wait.Done()
50+
}(i)
51+
}
52+
wait.Wait()
53+
54+
sort.Slice(mfs, func(i, j int) bool {
55+
return *mfs[i].Name < *mfs[j].Name
56+
})
57+
return mfs, func() {
58+
for _, d := range dFns {
59+
d()
60+
}
61+
}, nil
62+
}
63+
64+
func TestCache(t *testing.T) {
65+
promlogConfig := &promlog.Config{}
66+
cacheInterval := 60 * time.Second
67+
logger := promlog.New(promlogConfig)
68+
69+
t.Run("gather with multiple calls should not error", func(t *testing.T) {
70+
gather := NewCachedTransactionGather(
71+
newMultiConcurrencyRegistry(
72+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)),
73+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)),
74+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)),
75+
),
76+
cacheInterval, logger,
77+
)
78+
wait := sync.WaitGroup{}
79+
wait.Add(10)
80+
for range [10]int{} {
81+
go func() {
82+
begin := time.Now()
83+
mfs, done, err := gather.Gather()
84+
defer done()
85+
if err != nil {
86+
logger.Log("err", err)
87+
t.Errorf("gather error: %v", err)
88+
}
89+
logger.Log("mfs", mfs, "done", "err", err)
90+
if time.Since(begin) > cacheInterval {
91+
t.Errorf("gather cost more than cacheInterval %v", time.Since(begin).String())
92+
}
93+
wait.Done()
94+
}()
95+
}
96+
wait.Wait()
97+
})
98+
99+
t.Run("gather success", func(t *testing.T) {
100+
gather := NewCachedTransactionGather(
101+
newMultiConcurrencyRegistry(
102+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)),
103+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)),
104+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)),
105+
),
106+
cacheInterval, logger,
107+
)
108+
wait := sync.WaitGroup{}
109+
wait.Add(3)
110+
go func() {
111+
mfs, done, err := gather.Gather()
112+
defer done()
113+
if err != nil {
114+
logger.Log("err", err)
115+
t.Errorf("gather error: %v", err)
116+
}
117+
logger.Log("mfs", mfs, "done", "err", err)
118+
wait.Done()
119+
}()
120+
go func() {
121+
mfs, done, err := gather.Gather()
122+
defer done()
123+
if err != nil {
124+
logger.Log("err", err)
125+
t.Errorf("gather error: %v", err)
126+
}
127+
logger.Log("mfs", mfs, "done", "err", err)
128+
wait.Done()
129+
}()
130+
go func() {
131+
mfs, done, err := gather.Gather()
132+
defer done()
133+
if err != nil {
134+
logger.Log("err", err)
135+
t.Errorf("gather error: %v", err)
136+
}
137+
logger.Log("mfs", mfs, "done", "err", err)
138+
wait.Done()
139+
}()
140+
wait.Wait()
141+
})
142+
143+
t.Run("gather with 5s step", func(t *testing.T) {
144+
gather := NewCachedTransactionGather(
145+
newMultiConcurrencyRegistry(
146+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)),
147+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)),
148+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)),
149+
),
150+
cacheInterval, logger,
151+
)
152+
wait := sync.WaitGroup{}
153+
wait.Add(10)
154+
for range [10]int{} {
155+
time.Sleep(time.Second * 5)
156+
go func() {
157+
mfs, done, err := gather.Gather()
158+
defer done()
159+
if err != nil {
160+
logger.Log("err", err)
161+
t.Errorf("gather error: %v", err)
162+
}
163+
logger.Log("mfs", mfs, "done", "err", err)
164+
wait.Done()
165+
}()
166+
}
167+
wait.Wait()
168+
})
169+
170+
t.Run("gather with 65s step", func(t *testing.T) {
171+
gather := NewCachedTransactionGather(
172+
newMultiConcurrencyRegistry(
173+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)),
174+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)),
175+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)),
176+
),
177+
cacheInterval, logger,
178+
)
179+
wait := sync.WaitGroup{}
180+
wait.Add(3)
181+
for range [3]int{} {
182+
time.Sleep(time.Second * 65)
183+
go func() {
184+
mfs, done, err := gather.Gather()
185+
defer done()
186+
if err != nil {
187+
logger.Log("err", err)
188+
t.Errorf("gather error: %v", err)
189+
}
190+
logger.Log("mfs", mfs, "done", "err", err)
191+
wait.Done()
192+
}()
193+
}
194+
wait.Wait()
195+
})
196+
}

0 commit comments

Comments
 (0)