diff --git a/config/file_config.go b/config/file_config.go index 32b8752029..0dc441eada 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -60,7 +60,7 @@ type configContents struct { Logger string `validate:"required,oneof= logrus honeycomb"` LoggingLevel string `validate:"required"` Collector string `validate:"required,oneof= InMemCollector"` - Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler"` + Sampler string `validate:"required,oneof= DeterministicSampler DynamicSampler EMADynamicSampler RulesBasedSampler TotalThroughputSampler"` Metrics string `validate:"required,oneof= prometheus honeycomb"` SendDelay time.Duration `validate:"required"` TraceTimeout time.Duration `validate:"required"` @@ -481,6 +481,8 @@ func (f *fileConfig) GetSamplerConfigForDataset(dataset string) (interface{}, er i = &EMADynamicSamplerConfig{} case "RulesBasedSampler": i = &RulesBasedSamplerConfig{} + case "TotalThroughputSampler": + i = &TotalThroughputSamplerConfig{} default: return nil, errors.New("No Sampler found") } @@ -502,6 +504,8 @@ func (f *fileConfig) GetSamplerConfigForDataset(dataset string) (interface{}, er i = &EMADynamicSamplerConfig{} case "RulesBasedSampler": i = &RulesBasedSamplerConfig{} + case "TotalThroughputSampler": + i = &TotalThroughputSamplerConfig{} default: return nil, errors.New("No Sampler found") } diff --git a/config/sampler_config.go b/config/sampler_config.go index c03c23e52c..170ed932c8 100644 --- a/config/sampler_config.go +++ b/config/sampler_config.go @@ -27,3 +27,12 @@ type EMADynamicSamplerConfig struct { AddSampleRateKeyToTrace bool AddSampleRateKeyToTraceField string } + +type TotalThroughputSamplerConfig struct { + GoalThroughputPerSec int64 `validate:"gte=1"` + ClearFrequencySec int64 + FieldList []string `validate:"required"` + UseTraceLength bool + AddSampleRateKeyToTrace bool + AddSampleRateKeyToTraceField string +} diff --git a/sample/sample.go b/sample/sample.go index 7b56db2dc1..f312162f4c 100644 --- a/sample/sample.go +++ b/sample/sample.go @@ -47,6 +47,10 @@ func (s *SamplerFactory) GetSamplerImplementationForDataset(dataset string) Samp ds := &RulesBasedSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} ds.Start() sampler = ds + case *config.TotalThroughputSamplerConfig: + ds := &TotalThroughputSampler{Config: c, Logger: s.Logger, Metrics: s.Metrics} + ds.Start() + sampler = ds default: s.Logger.Error().Logf("unknown sampler type %T. Exiting.", c) os.Exit(1) diff --git a/sample/totalthroughput.go b/sample/totalthroughput.go new file mode 100644 index 0000000000..3752f34d25 --- /dev/null +++ b/sample/totalthroughput.go @@ -0,0 +1,76 @@ +package sample + +import ( + "math/rand" + + dynsampler "github.com/honeycombio/dynsampler-go" + + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/types" +) + +type TotalThroughputSampler struct { + Config *config.TotalThroughputSamplerConfig + Logger logger.Logger + Metrics metrics.Metrics + + goalThroughputPerSec int64 + clearFrequencySec int64 + + key *traceKey + + dynsampler dynsampler.Sampler +} + +func (d *TotalThroughputSampler) Start() error { + d.Logger.Debug().Logf("Starting TotalThroughputSampler") + defer func() { d.Logger.Debug().Logf("Finished starting TotalThroughputSampler") }() + if d.Config.GoalThroughputPerSec < 1 { + d.Logger.Debug().Logf("configured sample rate for dynamic sampler was %d; forcing to 100", d.Config.GoalThroughputPerSec) + d.Config.GoalThroughputPerSec = 100 + } + d.goalThroughputPerSec = d.Config.GoalThroughputPerSec + if d.Config.ClearFrequencySec == 0 { + d.Config.ClearFrequencySec = 30 + } + d.clearFrequencySec = d.Config.ClearFrequencySec + d.key = newTraceKey(d.Config.FieldList, d.Config.UseTraceLength, d.Config.AddSampleRateKeyToTrace, d.Config.AddSampleRateKeyToTraceField) + + // spin up the actual dynamic sampler + d.dynsampler = &dynsampler.TotalThroughput{ + GoalThroughputPerSec: int(d.goalThroughputPerSec), + ClearFrequencySec: int(d.clearFrequencySec), + } + d.dynsampler.Start() + + // Register stastics this package will produce + d.Metrics.Register("dynsampler_num_dropped", "counter") + d.Metrics.Register("dynsampler_num_kept", "counter") + d.Metrics.Register("dynsampler_sample_rate", "histogram") + + return nil +} + +func (d *TotalThroughputSampler) GetSampleRate(trace *types.Trace) (uint, bool) { + key := d.key.buildAndAdd(trace) + rate := d.dynsampler.GetSampleRate(key) + if rate < 1 { // protect against dynsampler being broken even though it shouldn't be + rate = 1 + } + shouldKeep := rand.Intn(int(rate)) == 0 + d.Logger.Debug().WithFields(map[string]interface{}{ + "sample_key": key, + "sample_rate": rate, + "sample_keep": shouldKeep, + "trace_id": trace.TraceID, + }).Logf("got sample rate and decision") + if shouldKeep { + d.Metrics.IncrementCounter("dynsampler_num_kept") + } else { + d.Metrics.IncrementCounter("dynsampler_num_dropped") + } + d.Metrics.Histogram("dynsampler_sample_rate", float64(rate)) + return uint(rate), shouldKeep +} diff --git a/sample/totalthroughput_test.go b/sample/totalthroughput_test.go new file mode 100644 index 0000000000..edef7d42a0 --- /dev/null +++ b/sample/totalthroughput_test.go @@ -0,0 +1,53 @@ +// +build all race + +package sample + +import ( + "testing" + + "github.com/honeycombio/refinery/config" + "github.com/honeycombio/refinery/logger" + "github.com/honeycombio/refinery/metrics" + "github.com/honeycombio/refinery/types" + + "github.com/stretchr/testify/assert" +) + +func TestTotalThroughputAddSampleRateKeyToTrace(t *testing.T) { + const spanCount = 5 + + metrics := metrics.MockMetrics{} + metrics.Start() + + sampler := &TotalThroughputSampler{ + Config: &config.TotalThroughputSamplerConfig{ + FieldList: []string{"http.status_code"}, + AddSampleRateKeyToTrace: true, + AddSampleRateKeyToTraceField: "meta.key", + }, + Logger: &logger.NullLogger{}, + Metrics: &metrics, + } + + trace := &types.Trace{} + for i := 0; i < spanCount; i++ { + trace.AddSpan(&types.Span{ + Event: types.Event{ + Data: map[string]interface{}{ + "http.status_code": "200", + }, + }, + }) + } + sampler.Start() + sampler.GetSampleRate(trace) + + spans := trace.GetSpans() + assert.Len(t, spans, spanCount, "should have the same number of spans as input") + for _, span := range spans { + assert.Equal(t, span.Event.Data, map[string]interface{}{ + "http.status_code": "200", + "meta.key": "200•,", + }, "should add the sampling key to all spans in the trace") + } +}