Skip to content

Commit 9bdd368

Browse files
authored
[adaptive sampling] Clean-up after previous refactoring (jaegertracing#5954)
## Which problem is this PR solving? - Previous refactoring left the code in a mixed up state ## Description of the changes - move provider / aggregator / post-aggregator into separate files named accordingly - do the same for tests - no actual code changes, just moving code around ## How was this change tested? - CI --------- Signed-off-by: Yuri Shkuro <[email protected]>
1 parent 88a0319 commit 9bdd368

File tree

5 files changed

+275
-247
lines changed

5 files changed

+275
-247
lines changed

plugin/sampling/strategyprovider/adaptive/aggregator.go

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ const (
2222
maxProbabilities = 10
2323
)
2424

25+
// aggregator is a kind of trace processor that watches for root spans
26+
// and calculates how many traces per service / per endpoint are being
27+
// produced. It periodically flushes these stats ("throughput") to storage.
28+
//
29+
// It also invokes PostAggregator which actually computes adaptive sampling
30+
// probabilities based on the observed throughput.
2531
type aggregator struct {
2632
sync.Mutex
2733

plugin/sampling/strategyprovider/adaptive/processor.go plugin/sampling/strategyprovider/adaptive/post_aggregator.go

+1-92
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package adaptive
55

66
import (
7-
"context"
87
"errors"
98
"math"
109
"math/rand"
@@ -17,7 +16,6 @@ import (
1716
"github.com/jaegertracing/jaeger/pkg/metrics"
1817
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
1918
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
20-
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2119
"github.com/jaegertracing/jaeger/storage/samplingstore"
2220
)
2321

@@ -58,7 +56,7 @@ type throughputBucket struct {
5856
endTime time.Time
5957
}
6058

61-
// PostAggregator retrieves service throughput over a look back interval and calculates sampling probabilities
59+
// PostAggregator retrieves service throughput over a lookback interval and calculates sampling probabilities
6260
// per operation such that each operation is sampled at a specified target QPS. It achieves this by
6361
// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput
6462
// and generating a probability to match the targetQPS.
@@ -129,16 +127,6 @@ func newPostAggregator(
129127
}, nil
130128
}
131129

132-
// GetSamplingStrategy implements protobuf endpoint for retrieving sampling strategy for a service.
133-
func (p *Provider) GetSamplingStrategy(_ context.Context, service string) (*api_v2.SamplingStrategyResponse, error) {
134-
p.RLock()
135-
defer p.RUnlock()
136-
if strategy, ok := p.strategyResponses[service]; ok {
137-
return strategy, nil
138-
}
139-
return p.generateDefaultSamplingStrategyResponse(), nil
140-
}
141-
142130
// Start initializes and starts the sampling postAggregator which regularly calculates sampling probabilities.
143131
func (p *PostAggregator) Start() error {
144132
p.logger.Info("starting adaptive sampling postAggregator")
@@ -148,52 +136,10 @@ func (p *PostAggregator) Start() error {
148136
return nil
149137
}
150138

151-
func (p *Provider) loadProbabilities() {
152-
// TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization
153-
probabilities, err := p.storage.GetLatestProbabilities()
154-
if err != nil {
155-
p.logger.Warn("failed to initialize probabilities", zap.Error(err))
156-
return
157-
}
158-
p.Lock()
159-
defer p.Unlock()
160-
p.probabilities = probabilities
161-
}
162-
163-
// runUpdateProbabilitiesLoop is a loop that reads probabilities from storage.
164-
// The follower updates its local cache with the latest probabilities and serves them.
165-
func (p *Provider) runUpdateProbabilitiesLoop() {
166-
select {
167-
case <-time.After(addJitter(p.followerRefreshInterval)):
168-
// continue after jitter delay
169-
case <-p.shutdown:
170-
return
171-
}
172-
173-
ticker := time.NewTicker(p.followerRefreshInterval)
174-
defer ticker.Stop()
175-
for {
176-
select {
177-
case <-ticker.C:
178-
// Only load probabilities if this strategy_store doesn't hold the leader lock
179-
if !p.isLeader() {
180-
p.loadProbabilities()
181-
p.generateStrategyResponses()
182-
}
183-
case <-p.shutdown:
184-
return
185-
}
186-
}
187-
}
188-
189139
func (p *PostAggregator) isLeader() bool {
190140
return p.electionParticipant.IsLeader()
191141
}
192142

193-
func (p *Provider) isLeader() bool {
194-
return p.electionParticipant.IsLeader()
195-
}
196-
197143
// addJitter adds a random amount of time. Without jitter, if the host holding the leader
198144
// lock were to die, then all other collectors can potentially wait for a full cycle before
199145
// trying to acquire the lock. With jitter, we can reduce the average amount of time before a
@@ -457,40 +403,3 @@ func (p *PostAggregator) isUsingAdaptiveSampling(
457403
}
458404
return false
459405
}
460-
461-
// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
462-
func (p *Provider) generateStrategyResponses() {
463-
p.RLock()
464-
strategies := make(map[string]*api_v2.SamplingStrategyResponse)
465-
for svc, opProbabilities := range p.probabilities {
466-
opStrategies := make([]*api_v2.OperationSamplingStrategy, len(opProbabilities))
467-
var idx int
468-
for op, probability := range opProbabilities {
469-
opStrategies[idx] = &api_v2.OperationSamplingStrategy{
470-
Operation: op,
471-
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
472-
SamplingRate: probability,
473-
},
474-
}
475-
idx++
476-
}
477-
strategy := p.generateDefaultSamplingStrategyResponse()
478-
strategy.OperationSampling.PerOperationStrategies = opStrategies
479-
strategies[svc] = strategy
480-
}
481-
p.RUnlock()
482-
483-
p.Lock()
484-
defer p.Unlock()
485-
p.strategyResponses = strategies
486-
}
487-
488-
func (p *Provider) generateDefaultSamplingStrategyResponse() *api_v2.SamplingStrategyResponse {
489-
return &api_v2.SamplingStrategyResponse{
490-
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
491-
OperationSampling: &api_v2.PerOperationSamplingStrategies{
492-
DefaultSamplingProbability: p.InitialSamplingProbability,
493-
DefaultLowerBoundTracesPerSecond: p.MinSamplesPerSecond,
494-
},
495-
}
496-
}

plugin/sampling/strategyprovider/adaptive/processor_test.go plugin/sampling/strategyprovider/adaptive/post_aggregator_test.go

-144
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
package adaptive
55

66
import (
7-
"context"
87
"errors"
98
"testing"
109
"time"
@@ -20,7 +19,6 @@ import (
2019
"github.com/jaegertracing/jaeger/pkg/testutils"
2120
epmocks "github.com/jaegertracing/jaeger/plugin/sampling/leaderelection/mocks"
2221
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
23-
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
2422
smocks "github.com/jaegertracing/jaeger/storage/samplingstore/mocks"
2523
)
2624

@@ -405,113 +403,6 @@ func TestRunCalculationLoop_GetThroughputError(t *testing.T) {
405403
require.NoError(t, agg.Close())
406404
}
407405

408-
func TestLoadProbabilities(t *testing.T) {
409-
mockStorage := &smocks.Store{}
410-
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
411-
412-
p := &Provider{storage: mockStorage}
413-
require.Nil(t, p.probabilities)
414-
p.loadProbabilities()
415-
require.NotNil(t, p.probabilities)
416-
}
417-
418-
func TestRunUpdateProbabilitiesLoop(t *testing.T) {
419-
mockStorage := &smocks.Store{}
420-
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
421-
mockEP := &epmocks.ElectionParticipant{}
422-
mockEP.On("Start").Return(nil)
423-
mockEP.On("Close").Return(nil)
424-
mockEP.On("IsLeader").Return(false)
425-
426-
p := &Provider{
427-
storage: mockStorage,
428-
shutdown: make(chan struct{}),
429-
followerRefreshInterval: time.Millisecond,
430-
electionParticipant: mockEP,
431-
}
432-
defer close(p.shutdown)
433-
require.Nil(t, p.probabilities)
434-
require.Nil(t, p.strategyResponses)
435-
go p.runUpdateProbabilitiesLoop()
436-
437-
for i := 0; i < 1000; i++ {
438-
p.RLock()
439-
if p.probabilities != nil && p.strategyResponses != nil {
440-
p.RUnlock()
441-
break
442-
}
443-
p.RUnlock()
444-
time.Sleep(time.Millisecond)
445-
}
446-
p.RLock()
447-
assert.NotNil(t, p.probabilities)
448-
assert.NotNil(t, p.strategyResponses)
449-
p.RUnlock()
450-
}
451-
452-
func TestRealisticRunCalculationLoop(t *testing.T) {
453-
t.Skip("Skipped realistic calculation loop test")
454-
logger := zap.NewNop()
455-
// NB: This is an extremely long test since it uses near realistic (1/6th scale) processor config values
456-
testThroughputs := []*model.Throughput{
457-
{Service: "svcA", Operation: "GET", Count: 10},
458-
{Service: "svcA", Operation: "POST", Count: 9},
459-
{Service: "svcA", Operation: "PUT", Count: 5},
460-
{Service: "svcA", Operation: "DELETE", Count: 20},
461-
}
462-
mockStorage := &smocks.Store{}
463-
mockStorage.On("GetThroughput", mock.AnythingOfType("time.Time"), mock.AnythingOfType("time.Time")).
464-
Return(testThroughputs, nil)
465-
mockStorage.On("GetLatestProbabilities").Return(make(model.ServiceOperationProbabilities), nil)
466-
mockStorage.On("InsertProbabilitiesAndQPS", "host", mock.AnythingOfType("model.ServiceOperationProbabilities"),
467-
mock.AnythingOfType("model.ServiceOperationQPS")).Return(nil)
468-
mockEP := &epmocks.ElectionParticipant{}
469-
mockEP.On("Start").Return(nil)
470-
mockEP.On("Close").Return(nil)
471-
mockEP.On("IsLeader").Return(true)
472-
cfg := Options{
473-
TargetSamplesPerSecond: 1.0,
474-
DeltaTolerance: 0.2,
475-
InitialSamplingProbability: 0.001,
476-
CalculationInterval: time.Second * 10,
477-
AggregationBuckets: 1,
478-
Delay: time.Second * 10,
479-
}
480-
s := NewProvider(cfg, logger, mockEP, mockStorage)
481-
s.Start()
482-
483-
for i := 0; i < 100; i++ {
484-
strategy, _ := s.GetSamplingStrategy(context.Background(), "svcA")
485-
if len(strategy.OperationSampling.PerOperationStrategies) != 0 {
486-
break
487-
}
488-
time.Sleep(250 * time.Millisecond)
489-
}
490-
s.Close()
491-
492-
strategy, err := s.GetSamplingStrategy(context.Background(), "svcA")
493-
require.NoError(t, err)
494-
require.Len(t, strategy.OperationSampling.PerOperationStrategies, 4)
495-
strategies := strategy.OperationSampling.PerOperationStrategies
496-
497-
for _, s := range strategies {
498-
switch s.Operation {
499-
case "GET":
500-
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
501-
"Already at 1QPS, no probability change")
502-
case "POST":
503-
assert.Equal(t, 0.001, s.ProbabilisticSampling.SamplingRate,
504-
"Within epsilon of 1QPS, no probability change")
505-
case "PUT":
506-
assert.InEpsilon(t, 0.002, s.ProbabilisticSampling.SamplingRate, 0.025,
507-
"Under sampled, double probability")
508-
case "DELETE":
509-
assert.InEpsilon(t, 0.0005, s.ProbabilisticSampling.SamplingRate, 0.025,
510-
"Over sampled, halve probability")
511-
}
512-
}
513-
}
514-
515406
func TestPrependBucket(t *testing.T) {
516407
p := &PostAggregator{Options: Options{AggregationBuckets: 1}}
517408
p.prependThroughputBucket(&throughputBucket{interval: time.Minute})
@@ -547,41 +438,6 @@ func TestConstructorFailure(t *testing.T) {
547438
require.EqualError(t, err, "BucketsForCalculation cannot be less than 1")
548439
}
549440

550-
func TestGenerateStrategyResponses(t *testing.T) {
551-
probabilities := model.ServiceOperationProbabilities{
552-
"svcA": map[string]float64{
553-
"GET": 0.5,
554-
},
555-
}
556-
p := &Provider{
557-
probabilities: probabilities,
558-
Options: Options{
559-
InitialSamplingProbability: 0.001,
560-
MinSamplesPerSecond: 0.0001,
561-
},
562-
}
563-
p.generateStrategyResponses()
564-
565-
expectedResponse := map[string]*api_v2.SamplingStrategyResponse{
566-
"svcA": {
567-
StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC,
568-
OperationSampling: &api_v2.PerOperationSamplingStrategies{
569-
DefaultSamplingProbability: 0.001,
570-
DefaultLowerBoundTracesPerSecond: 0.0001,
571-
PerOperationStrategies: []*api_v2.OperationSamplingStrategy{
572-
{
573-
Operation: "GET",
574-
ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{
575-
SamplingRate: 0.5,
576-
},
577-
},
578-
},
579-
},
580-
},
581-
}
582-
assert.Equal(t, expectedResponse, p.strategyResponses)
583-
}
584-
585441
func TestUsingAdaptiveSampling(t *testing.T) {
586442
p := &PostAggregator{}
587443
throughput := serviceOperationThroughput{

0 commit comments

Comments
 (0)