Skip to content

Commit

Permalink
feat: Add dynamic sampler processor
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeGoldsmith committed Aug 21, 2024
1 parent d74349c commit 73603db
Show file tree
Hide file tree
Showing 16 changed files with 948 additions and 0 deletions.
14 changes: 14 additions & 0 deletions dynamicsamplingprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Reduce Processor

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fdynamicsampling%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fdynamicsampling) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fdynamicsampling%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fdynamicsampling) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@cartermp](https://www.github.com/cartermp) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

This processor can apply sampling decisions on trace data and is based on [dysampler-go](https://github.com/honeycombio/dynsampler-go/).
51 changes: 51 additions & 0 deletions dynamicsamplingprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package dynamicsamplingprocessor

import (
"fmt"

"go.opentelemetry.io/collector/component"
)

type SamplerType string

const (
EMADynamicSampler string = "EMADynamicSampler"
EMAThroughputSampler string = "EMAThroughputSampler"
)

type Config struct {
Sampler string `mapstructure:"sampler"`
KeyFields []string `mapstructure:"key_fields"`

// EMADynamicSampler specific configuration
GoalSampleRate int `mapstructure:"goal_sample_rate"`

// EMAThroughputSampler specific configuration
GoalThroughputPerSecond int `mapstructure:"goal_throughput_per_second"`
}

var _ component.Config = (*Config)(nil)

func (cfg *Config) Validate() error {
if cfg.Sampler == "" {
return fmt.Errorf("sampler must be set. Valid options: %s, %s", EMADynamicSampler, EMAThroughputSampler)
}

if cfg.Sampler != EMADynamicSampler && cfg.Sampler != EMAThroughputSampler {
return fmt.Errorf("sampler must be set to one of the following: %s, %s", EMADynamicSampler, EMAThroughputSampler)
}

if len(cfg.KeyFields) == 0 {
return fmt.Errorf("Must set at least one attribute to use as a key for dynamic sampling")
}

if cfg.Sampler == EMADynamicSampler && cfg.GoalSampleRate <= 0 {
return fmt.Errorf("EMADynamicSampler goal_sample_rate must be set and greater than 0")
}

if cfg.Sampler == EMAThroughputSampler && cfg.GoalThroughputPerSecond <= 0 {
return fmt.Errorf("EMAThroughputSampler goal_throughput_per_second must be set and greater than 0")
}

return nil
}
74 changes: 74 additions & 0 deletions dynamicsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package dynamicsamplingprocessor

import (
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()
tests := []struct {
name string
id string
expected component.Config
}{
{
name: "EMADynamicSampler correct config",
id: "EMADynamicSampler",
expected: &Config{
Sampler: EMADynamicSampler,
KeyFields: []string{"key1", "key2"},
GoalSampleRate: 10,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)
processors, err := cm.Sub("processors")
require.NoError(t, err)

factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := processors.Sub(tt.id)
require.NoError(t, err)
require.NoError(t, sub.Unmarshal(cfg))

assert.NoError(t, component.ValidateConfig(cfg))
assert.Equal(t, tt.expected, cfg)
})
}
}

// func TestLoadInvalidConfig(t *testing.T) {
// for _, test := range []struct {
// file string
// contains string
// }{
// {"invalid_negative.yaml", "sampling rate is negative"},
// {"invalid_small.yaml", "sampling rate is too small"},
// {"invalid_inf.yaml", "sampling rate is invalid: +Inf%"},
// {"invalid_prec.yaml", "sampling precision is too great"},
// {"invalid_zero.yaml", "invalid sampling precision"},
// } {
// t.Run(test.file, func(t *testing.T) {
// factories, err := otelcoltest.NopFactories()
// require.NoError(t, err)

// factory := NewFactory()
// factories.Processors[metadata.Type] = factory
// // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33594
// // nolint:staticcheck
// _, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", test.file), factories)
// require.ErrorContains(t, err, test.contains)
// })
// }
// }
15 changes: 15 additions & 0 deletions dynamicsamplingprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# dynamic_sampler

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_processor_dynamic_sampler_count_logs_sampled

Count of logs that were sampled or not

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |
36 changes: 36 additions & 0 deletions dynamicsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//go:generate mdatagen metadata.yaml

package dynamicsamplingprocessor

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
)

func NewFactory() processor.Factory {
return processor.NewFactory(
component.MustNewType("dynamic_sampler"),
createDefaultConfig,
processor.WithLogs(createLogsProcessor, component.StabilityLevelDevelopment))
}

func createDefaultConfig() component.Config {
return &Config{
Sampler: EMADynamicSampler,
KeyFields: []string{"key1", "key2"},
GoalSampleRate: 10,
}
}

// createLogsProcessor creates a log processor based on this config.
func createLogsProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (processor.Logs, error) {
return newLogsProcessor(ctx, set, nextConsumer, cfg.(*Config))
}
76 changes: 76 additions & 0 deletions dynamicsamplingprocessor/generated_component_telemetry_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 73603db

Please sign in to comment.