Skip to content

Commit

Permalink
Add a TotalThroughputSampler. (#185)
Browse files Browse the repository at this point in the history
* Add a TotalThroughputSampler to try to avoid being rate limited when we hit sudden peaks
  • Loading branch information
magnusstahre authored Oct 28, 2020
1 parent c621909 commit 96843de
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 1 deletion.
6 changes: 5 additions & 1 deletion config/file_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type configContents struct {
Logger string `validate:"required,oneof= logrus honeycomb"`
LoggingLevel string `validate:"required"`
Collector string `validate:"required,oneof= InMemCollector"`
Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler"`
Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler TotalThroughputSampler"`
Metrics string `validate:"required,oneof= prometheus honeycomb"`
SendDelay time.Duration `validate:"required"`
TraceTimeout time.Duration `validate:"required"`
Expand Down Expand Up @@ -481,6 +481,8 @@ func (f *fileConfig) GetSamplerConfigForDataset(dataset string) (interface{}, er
i = &EMADynamicSamplerConfig{}
case "RulesBasedSampler":
i = &RulesBasedSamplerConfig{}
case "TotalThroughputSampler":
i = &TotalThroughputSamplerConfig{}
default:
return nil, errors.New("No Sampler found")
}
Expand All @@ -502,6 +504,8 @@ func (f *fileConfig) GetSamplerConfigForDataset(dataset string) (interface{}, er
i = &EMADynamicSamplerConfig{}
case "RulesBasedSampler":
i = &RulesBasedSamplerConfig{}
case "TotalThroughputSampler":
i = &TotalThroughputSamplerConfig{}
default:
return nil, errors.New("No Sampler found")
}
Expand Down
9 changes: 9 additions & 0 deletions config/sampler_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,12 @@ type EMADynamicSamplerConfig struct {
AddSampleRateKeyToTrace bool
AddSampleRateKeyToTraceField string
}

type TotalThroughputSamplerConfig struct {
GoalThroughputPerSec int64 `validate:"gte=1"`
ClearFrequencySec int64
FieldList []string `validate:"required"`
UseTraceLength bool
AddSampleRateKeyToTrace bool
AddSampleRateKeyToTraceField string
}
4 changes: 4 additions & 0 deletions sample/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (s *SamplerFactory) GetSamplerImplementationForDataset(dataset string) Samp
ds := &RulesBasedSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics}
ds.Start()
sampler = ds
case *config.TotalThroughputSamplerConfig:
ds := &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics}
ds.Start()
sampler = ds
default:
s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c)
os.Exit(1)
Expand Down
76 changes: 76 additions & 0 deletions sample/totalthroughput.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package sample

import (
"math/rand"

dynsampler "github.com/honeycombio/dynsampler-go"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/types"
)

type TotalThroughputSampler struct {
Config *config.TotalThroughputSamplerConfig
Logger logger.Logger
Metrics metrics.Metrics

goalThroughputPerSec int64
clearFrequencySec int64

key *traceKey

dynsampler dynsampler.Sampler
}

func (d *TotalThroughputSampler) Start() error {
d.Logger.Debug().Logf("Starting TotalThroughputSampler")
defer func() { d.Logger.Debug().Logf("Finished starting TotalThroughputSampler") }()
if d.Config.GoalThroughputPerSec < 1 {
d.Logger.Debug().Logf("configured sample rate for dynamic sampler was %d; forcing to 100", d.Config.GoalThroughputPerSec)
d.Config.GoalThroughputPerSec = 100
}
d.goalThroughputPerSec = d.Config.GoalThroughputPerSec
if d.Config.ClearFrequencySec == 0 {
d.Config.ClearFrequencySec = 30
}
d.clearFrequencySec = d.Config.ClearFrequencySec
d.key = newTraceKey(d.Config.FieldList, d.Config.UseTraceLength, d.Config.AddSampleRateKeyToTrace, d.Config.AddSampleRateKeyToTraceField)

// spin up the actual dynamic sampler
d.dynsampler = &dynsampler.TotalThroughput{
GoalThroughputPerSec: int(d.goalThroughputPerSec),
ClearFrequencySec: int(d.clearFrequencySec),
}
d.dynsampler.Start()

// Register stastics this package will produce
d.Metrics.Register("dynsampler_num_dropped", "counter")
d.Metrics.Register("dynsampler_num_kept", "counter")
d.Metrics.Register("dynsampler_sample_rate", "histogram")

return nil
}

func (d *TotalThroughputSampler) GetSampleRate(trace *types.Trace) (uint, bool) {
key := d.key.buildAndAdd(trace)
rate := d.dynsampler.GetSampleRate(key)
if rate < 1 { // protect against dynsampler being broken even though it shouldn't be
rate = 1
}
shouldKeep := rand.Intn(int(rate)) == 0
d.Logger.Debug().WithFields(map[string]interface{}{
"sample_key": key,
"sample_rate": rate,
"sample_keep": shouldKeep,
"trace_id": trace.TraceID,
}).Logf("got sample rate and decision")
if shouldKeep {
d.Metrics.IncrementCounter("dynsampler_num_kept")
} else {
d.Metrics.IncrementCounter("dynsampler_num_dropped")
}
d.Metrics.Histogram("dynsampler_sample_rate", float64(rate))
return uint(rate), shouldKeep
}
53 changes: 53 additions & 0 deletions sample/totalthroughput_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// +build all race

package sample

import (
"testing"

"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/logger"
"github.com/honeycombio/refinery/metrics"
"github.com/honeycombio/refinery/types"

"github.com/stretchr/testify/assert"
)

func TestTotalThroughputAddSampleRateKeyToTrace(t *testing.T) {
const spanCount = 5

metrics := metrics.MockMetrics{}
metrics.Start()

sampler := &TotalThroughputSampler{
Config: &config.TotalThroughputSamplerConfig{
FieldList: []string{"http.status_code"},
AddSampleRateKeyToTrace: true,
AddSampleRateKeyToTraceField: "meta.key",
},
Logger: &logger.NullLogger{},
Metrics: &metrics,
}

trace := &types.Trace{}
for i := 0; i < spanCount; i++ {
trace.AddSpan(&types.Span{
Event: types.Event{
Data: map[string]interface{}{
"http.status_code": "200",
},
},
})
}
sampler.Start()
sampler.GetSampleRate(trace)

spans := trace.GetSpans()
assert.Len(t, spans, spanCount, "should have the same number of spans as input")
for _, span := range spans {
assert.Equal(t, span.Event.Data, map[string]interface{}{
"http.status_code": "200",
"meta.key": "200•,",
}, "should add the sampling key to all spans in the trace")
}
}

0 comments on commit 96843de

Please sign in to comment.