From cc1510b2d6e263bf69197fb17ccc20a531c05bd6 Mon Sep 17 00:00:00 2001 From: andreaxia Date: Wed, 10 Aug 2022 11:01:58 +0800 Subject: [PATCH 1/3] add cached transaction gather test --- .../cachedtransactiongather.go | 6 +- .../cachedtransactiongather_test.go | 134 ++++++++++++++++++ 2 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 pkg/cachedtransactiongather/cachedtransactiongather_test.go diff --git a/pkg/cachedtransactiongather/cachedtransactiongather.go b/pkg/cachedtransactiongather/cachedtransactiongather.go index 7938c38..e4d0b62 100644 --- a/pkg/cachedtransactiongather/cachedtransactiongather.go +++ b/pkg/cachedtransactiongather/cachedtransactiongather.go @@ -40,12 +40,10 @@ type cachedTransactionGather struct { } func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily, func(), error) { - c.lock.RLock() + c.lock.Lock() shouldGather := time.Now().After(c.nextCollectionTime) - c.lock.RUnlock() if shouldGather { begin := time.Now() - c.lock.Lock() c.nextCollectionTime = c.nextCollectionTime.Add(c.cacheInterval) metrics, done, err := c.gather.Gather() if err != nil { @@ -60,6 +58,8 @@ func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily c.lock.Unlock() duration := time.Since(begin) level.Info(c.logger).Log("msg", "Collect all products done", "duration_seconds", duration.Seconds()) + } else { + c.lock.Unlock() } c.lock.RLock() defer c.lock.RUnlock() diff --git a/pkg/cachedtransactiongather/cachedtransactiongather_test.go b/pkg/cachedtransactiongather/cachedtransactiongather_test.go new file mode 100644 index 0000000..143bf53 --- /dev/null +++ b/pkg/cachedtransactiongather/cachedtransactiongather_test.go @@ -0,0 +1,134 @@ +package cachedtransactiongather + +import ( + "fmt" + "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/promlog" + "sort" + "sync" + "testing" + "time" +) + +type mockGatherer struct { + sleepUntil time.Duration +} + +func (m mockGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) { + fmt.Println("start gather: " + m.sleepUntil.String()) + time.Sleep(m.sleepUntil) + fmt.Sprintf("end gather: " + m.sleepUntil.String()) + return []*io_prometheus_client.MetricFamily{}, nil +} + +func newMockGatherer(duration time.Duration) prometheus.Gatherer { + return &mockGatherer{ + sleepUntil: duration, + } +} + +type multiTRegistry struct { + tGatherers []prometheus.TransactionalGatherer +} + +func newMultiConcurrencyRegistry(tGatherers ...prometheus.TransactionalGatherer) *multiTRegistry { + return &multiTRegistry{ + tGatherers: tGatherers, + } +} + +// Gather implements TransactionalGatherer interface. +func (r *multiTRegistry) Gather() (mfs []*io_prometheus_client.MetricFamily, done func(), err error) { + dFns := make([]func(), 0, len(r.tGatherers)) + wait := sync.WaitGroup{} + wait.Add(len(r.tGatherers)) + for i := range r.tGatherers { + go func(i int) { + _, _, _ = r.tGatherers[i].Gather() + wait.Done() + }(i) + } + wait.Wait() + + sort.Slice(mfs, func(i, j int) bool { + return *mfs[i].Name < *mfs[j].Name + }) + return mfs, func() { + for _, d := range dFns { + d() + } + }, nil +} + +func TestCache(t *testing.T) { + promlogConfig := &promlog.Config{} + cacheInterval := 60 * time.Second + logger := promlog.New(promlogConfig) + gather := NewCachedTransactionGather( + newMultiConcurrencyRegistry( + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), + ), + cacheInterval, logger, + ) + + t.Run("gather with multiple calls should not error", func(t *testing.T) { + wait := sync.WaitGroup{} + wait.Add(10) + for range [10]int{} { + go func() { + begin := time.Now() + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + if time.Since(begin) > cacheInterval { + t.Errorf("gather cost more than cacheInterval %v", time.Since(begin).String()) + } + wait.Done() + }() + } + wait.Wait() + }) + + t.Run("gather success", func(t *testing.T) { + wait := sync.WaitGroup{} + wait.Add(3) + go func() { + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + wait.Done() + }() + go func() { + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + wait.Done() + }() + go func() { + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + wait.Done() + }() + wait.Wait() + }) +} From 4963673934ae7696b5d5e3241ae7c1358dcc0ed9 Mon Sep 17 00:00:00 2001 From: andreaxia Date: Wed, 10 Aug 2022 11:10:24 +0800 Subject: [PATCH 2/3] fix fmt --- pkg/cachedtransactiongather/cachedtransactiongather_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cachedtransactiongather/cachedtransactiongather_test.go b/pkg/cachedtransactiongather/cachedtransactiongather_test.go index 143bf53..61299f3 100644 --- a/pkg/cachedtransactiongather/cachedtransactiongather_test.go +++ b/pkg/cachedtransactiongather/cachedtransactiongather_test.go @@ -18,7 +18,7 @@ type mockGatherer struct { func (m mockGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) { fmt.Println("start gather: " + m.sleepUntil.String()) time.Sleep(m.sleepUntil) - fmt.Sprintf("end gather: " + m.sleepUntil.String()) + fmt.Println("end gather: " + m.sleepUntil.String()) return []*io_prometheus_client.MetricFamily{}, nil } From b341c7415db8eb40e9bbdcc276c0513273484ca4 Mon Sep 17 00:00:00 2001 From: andreaxia Date: Wed, 10 Aug 2022 11:27:59 +0800 Subject: [PATCH 3/3] add step test --- .../cachedtransactiongather_test.go | 78 +++++++++++++++++-- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/pkg/cachedtransactiongather/cachedtransactiongather_test.go b/pkg/cachedtransactiongather/cachedtransactiongather_test.go index 61299f3..79e5d5b 100644 --- a/pkg/cachedtransactiongather/cachedtransactiongather_test.go +++ b/pkg/cachedtransactiongather/cachedtransactiongather_test.go @@ -65,16 +65,16 @@ func TestCache(t *testing.T) { promlogConfig := &promlog.Config{} cacheInterval := 60 * time.Second logger := promlog.New(promlogConfig) - gather := NewCachedTransactionGather( - newMultiConcurrencyRegistry( - prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), - prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), - prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), - ), - cacheInterval, logger, - ) t.Run("gather with multiple calls should not error", func(t *testing.T) { + gather := NewCachedTransactionGather( + newMultiConcurrencyRegistry( + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), + ), + cacheInterval, logger, + ) wait := sync.WaitGroup{} wait.Add(10) for range [10]int{} { @@ -97,6 +97,14 @@ func TestCache(t *testing.T) { }) t.Run("gather success", func(t *testing.T) { + gather := NewCachedTransactionGather( + newMultiConcurrencyRegistry( + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), + ), + cacheInterval, logger, + ) wait := sync.WaitGroup{} wait.Add(3) go func() { @@ -131,4 +139,58 @@ func TestCache(t *testing.T) { }() wait.Wait() }) + + t.Run("gather with 5s step", func(t *testing.T) { + gather := NewCachedTransactionGather( + newMultiConcurrencyRegistry( + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), + ), + cacheInterval, logger, + ) + wait := sync.WaitGroup{} + wait.Add(10) + for range [10]int{} { + time.Sleep(time.Second * 5) + go func() { + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + wait.Done() + }() + } + wait.Wait() + }) + + t.Run("gather with 65s step", func(t *testing.T) { + gather := NewCachedTransactionGather( + newMultiConcurrencyRegistry( + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)), + prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)), + ), + cacheInterval, logger, + ) + wait := sync.WaitGroup{} + wait.Add(3) + for range [3]int{} { + time.Sleep(time.Second * 65) + go func() { + mfs, done, err := gather.Gather() + defer done() + if err != nil { + logger.Log("err", err) + t.Errorf("gather error: %v", err) + } + logger.Log("mfs", mfs, "done", "err", err) + wait.Done() + }() + } + wait.Wait() + }) }