Skip to content

Commit

Permalink
[tailsamplingprocessor] Support external decision cache implementatio…
Browse files Browse the repository at this point in the history
…ns (open-telemetry#37035)

#### Description

Adding a feature. This PR adds support for external implementations of
the decision cache. This allows the collector (or another service using
the processor) to supply an alternative decision cache based on
alternative algorithms or external services like memcached without
needing to explicitly add support for all possible options in the
processor.

It re-uses the existing function option pattern and only exposes two
options for now: `WithSampledDecisionCache` and
`WithNonSampledDecisionCache`. I've avoided exporting other options to
avoid bloating the external interface without a concrete use case. The
majority of changes are cleanup from the refactoring to move `Option`
values into the `Config` struct instead of in a variadic parameter on
`newTracesProcessor`.

---------

Co-authored-by: Yuna Verheyden <[email protected]>
  • Loading branch information
Logiraptor and yvrhdn authored Feb 12, 2025
1 parent b0340fd commit 876a359
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 76 deletions.
27 changes: 27 additions & 0 deletions .chloggen/tsp-external-cache.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for external caches when using the Tailsampling Processor in code.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37035]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions cmd/checkapi/allowlist.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
extension/observer
extension/opampcustommessages
processor/tailsamplingprocessor
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
package cache // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"

import "go.opentelemetry.io/collector/pdata/pcommon"

Expand Down
2 changes: 2 additions & 0 deletions processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,6 @@ type Config struct {
PolicyCfgs []PolicyCfg `mapstructure:"policies"`
// DecisionCache holds configuration for the decision cache(s)
DecisionCache DecisionCacheConfig `mapstructure:"decision_cache"`
// Options allows for additional configuration of the tail-based sampling processor in code.
Options []Option `mapstructure:"-"`
}
14 changes: 7 additions & 7 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
Expand Down Expand Up @@ -88,7 +88,7 @@ type Option func(*tailSamplingSpanProcessor)

// newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given
// configuration.
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config, opts ...Option) (processor.Traces, error) {
func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) {
telemetrySettings := set.TelemetrySettings
telemetry, err := metadata.NewTelemetryBuilder(telemetrySettings)
if err != nil {
Expand Down Expand Up @@ -124,7 +124,7 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}

for _, opt := range opts {
for _, opt := range cfg.Options {
opt(tsp)
}

Expand Down Expand Up @@ -174,15 +174,15 @@ func withTickerFrequency(frequency time.Duration) Option {
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withSampledDecisionCache(c cache.Cache[bool]) Option {
// WithSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func WithSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.sampledIDCache = c
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withNonSampledDecisionCache(c cache.Cache[bool]) Option {
// WithNonSampledDecisionCache sets the cache which the processor uses to store recently non-sampled trace IDs.
func WithNonSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.nonSampledIDCache = c
}
Expand Down
118 changes: 77 additions & 41 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/cache"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

func TestSamplingPolicyTypicalPath(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -33,7 +29,15 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -63,10 +67,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
}

func TestSamplingPolicyInvertSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -76,7 +76,15 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -106,10 +114,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
}

func TestSamplingMultiplePolicies(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -121,7 +125,15 @@ func TestSamplingMultiplePolicies(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -155,10 +167,6 @@ func TestSamplingMultiplePolicies(t *testing.T) {
}

func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -168,7 +176,15 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
{name: "mock-policy-1", evaluator: mpe1, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -199,10 +215,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
}

func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -214,7 +226,15 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -248,10 +268,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
}

func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -263,7 +279,15 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
{name: "mock-policy-2", evaluator: mpe2, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-2"))},
}

p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies))
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -316,10 +340,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
}

func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -331,7 +351,17 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withSampledDecisionCache(c))

cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
WithSampledDecisionCache(c),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -387,10 +417,6 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
}

func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
idb := newSyncIDBatcher()

Expand All @@ -402,7 +428,17 @@ func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c))

cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
Options: []Option{
withDecisionBatcher(idb),
withPolicies(policies),
WithNonSampledDecisionCache(c),
},
}
p, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), nextConsumer, cfg)
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
Loading

0 comments on commit 876a359

Please sign in to comment.