Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/tailsampling] record sampling policy #36312

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .chloggen/tailsampling-record-policy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/tailsampling

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: |
Adds support for optionally recording the policy (and any composite policy) associated with an inclusive tail processor sampling decision.
This functionality is disabled by default, you can enable it by passing the following feature flag to the collector: `+processor.tailsamplingprocessor.recordpolicy`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35180]
10 changes: 10 additions & 0 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,16 @@ sum (otelcol_processor_tail_sampling_count_traces_sampled) by (policy)

As a reminder, a policy voting to sample the trace does not guarantee sampling; an "inverted not" decision from another policy would still discard the trace.

### Tracking sampling policy
To better understand _which_ sampling policy made the decision to include a trace, you can enable tracking the policy responsible for sampling a trace via the `processor.tailsamplingprocessor.recordpolicy` feature gate.

When this feature gate is set, this will add additional attributes on each sampled span:

| Attribute | Description | Present? |
|---------------------------------|---------------------------------------------------------------------------|----------------------------|
| `tailsampling.policy` | Records the configured name of the policy that sampled a trace | Always |
| `tailsampling.composite_policy` | Records the configured name of a composite subpolicy that sampled a trace | When composite policy used |

### Policy Evaluation Errors

```
Expand Down
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/composite_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
)

func getNewCompositePolicy(settings component.TelemetrySettings, config *CompositeCfg) (sampling.PolicyEvaluator, error) {
Expand All @@ -22,10 +23,11 @@ func getNewCompositePolicy(settings component.TelemetrySettings, config *Composi
evalParams := sampling.SubPolicyEvalParams{
Evaluator: policy,
MaxSpansPerSecond: int64(rateAllocationsMap[policyCfg.Name]),
Name: policyCfg.Name,
}
subPolicyEvalParams[i] = evalParams
}
return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}), nil
return sampling.NewComposite(settings.Logger, config.MaxTotalSpansPerSecond, subPolicyEvalParams, sampling.MonotonicClock{}, telemetry.IsRecordPolicyEnabled()), nil
}

// Apply rate allocations to the sub-policies
Expand Down
4 changes: 3 additions & 1 deletion processor/tailsamplingprocessor/composite_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ func TestCompositeHelper(t *testing.T) {
{
Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 100, 0),
MaxSpansPerSecond: 250,
Name: "test-composite-policy-1",
},
{
Evaluator: sampling.NewLatency(componenttest.NewNopTelemetrySettings(), 200, 0),
MaxSpansPerSecond: 500,
Name: "test-composite-policy-2",
},
}, sampling.MonotonicClock{})
}, sampling.MonotonicClock{}, false)
assert.Equal(t, expected, actual)
})

Expand Down
8 changes: 7 additions & 1 deletion processor/tailsamplingprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry"
)

// NewFactory returns a new factory for the Tail Sampling processor.
Expand All @@ -38,5 +39,10 @@ func createTracesProcessor(
nextConsumer consumer.Traces,
) (processor.Traces, error) {
tCfg := cfg.(*Config)
return newTracesProcessor(ctx, params, nextConsumer, *tCfg)
opts := []Option{}

if telemetry.IsRecordPolicyEnabled() {
opts = append(opts, withRecordPolicy())
}
return newTracesProcessor(ctx, params, nextConsumer, *tCfg, opts...)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions processor/tailsamplingprocessor/internal/sampling/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type subpolicy struct {

// spans per second that each subpolicy sampled in this period
sampledSPS int64

name string
}

// Composite evaluator and its internal data
Expand All @@ -35,7 +37,8 @@ type Composite struct {
// The time provider (can be different from clock for testing purposes)
timeProvider TimeProvider

logger *zap.Logger
logger *zap.Logger
recordSubPolicy bool
}

var _ PolicyEvaluator = (*Composite)(nil)
Expand All @@ -44,6 +47,7 @@ var _ PolicyEvaluator = (*Composite)(nil)
type SubPolicyEvalParams struct {
Evaluator PolicyEvaluator
MaxSpansPerSecond int64
Name string
}

// NewComposite creates a policy evaluator that samples all subpolicies.
Expand All @@ -52,25 +56,27 @@ func NewComposite(
maxTotalSpansPerSecond int64,
subPolicyParams []SubPolicyEvalParams,
timeProvider TimeProvider,
recordSubPolicy bool,
) PolicyEvaluator {
var subpolicies []*subpolicy

for i := 0; i < len(subPolicyParams); i++ {
sub := &subpolicy{}
sub.evaluator = subPolicyParams[i].Evaluator
sub.allocatedSPS = subPolicyParams[i].MaxSpansPerSecond

sub.name = subPolicyParams[i].Name
// We are just starting, so there is no previous input, set it to 0
sub.sampledSPS = 0

subpolicies = append(subpolicies, sub)
}

return &Composite{
maxTotalSPS: maxTotalSpansPerSecond,
subpolicies: subpolicies,
timeProvider: timeProvider,
logger: logger,
maxTotalSPS: maxTotalSpansPerSecond,
subpolicies: subpolicies,
timeProvider: timeProvider,
logger: logger,
recordSubPolicy: recordSubPolicy,
}
}

Expand Down Expand Up @@ -110,6 +116,9 @@ func (c *Composite) Evaluate(ctx context.Context, traceID pcommon.TraceID, trace
sub.sampledSPS = spansInSecondIfSampled

// Let the sampling happen
if c.recordSubPolicy {
SetAttrOnScopeSpans(trace, "tailsampling.composite_policy", sub.name)
}
return Sampled, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestCompositeEvaluatorNotSampled(t *testing.T) {
// Create 2 policies which do not match any trace
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 200, 300, false)
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false)

trace := createTrace()

Expand All @@ -77,7 +77,7 @@ func TestCompositeEvaluatorSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100}, {n2, 100}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, false)

trace := createTrace()

Expand All @@ -89,13 +89,32 @@ func TestCompositeEvaluatorSampled(t *testing.T) {
assert.Equal(t, expected, decision)
}

func TestCompositeEvaluatorSampled_RecordSubPolicy(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 1000, []SubPolicyEvalParams{{n1, 100, "eval-1"}, {n2, 100, "eval-2"}}, FakeTimeProvider{}, true)

trace := newTraceWithKV(traceID, "test-key", 0)

decision, err := c.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate composite policy: %v", err)

// The second policy is AlwaysSample, so the decision should be Sampled.
expected := Sampled
assert.Equal(t, expected, decision)
val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy")
assert.True(t, ok, "Did not find expected key")
assert.Equal(t, "eval-2", val.AsString())
}

func TestCompositeEvaluator_OverflowAlwaysSampled(t *testing.T) {
timeProvider := &FakeTimeProvider{second: 0}

// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1}, {n2, 1}}, timeProvider)
c := NewComposite(zap.NewNop(), 3, []SubPolicyEvalParams{{n1, 1, "eval-1"}, {n2, 1, "eval-2"}}, timeProvider, false)

trace := newTraceWithKV(traceID, "tag", int64(10))

Expand Down Expand Up @@ -128,7 +147,7 @@ func TestCompositeEvaluatorSampled_AlwaysSampled(t *testing.T) {
// Create 2 subpolicies. First results in 100% NotSampled, the second in 100% Sampled.
n1 := NewNumericAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", 0, 100, false)
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false)

for i := 1; i <= 10; i++ {
trace := createTrace()
Expand All @@ -146,7 +165,7 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) {
// The first policy does not match, the second matches through invert
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false)
n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true)
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20}, {n2, 20}}, FakeTimeProvider{})
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, false)

for i := 1; i <= 10; i++ {
trace := createTrace()
Expand All @@ -160,12 +179,33 @@ func TestCompositeEvaluatorInverseSampled_AlwaysSampled(t *testing.T) {
}
}

func TestCompositeEvaluatorInverseSampled_AlwaysSampled_RecordSubPolicy(t *testing.T) {
// The first policy does not match, the second matches through invert
n1 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, false)
n2 := NewStringAttributeFilter(componenttest.NewNopTelemetrySettings(), "tag", []string{"foo"}, false, 0, true)
c := NewComposite(zap.NewNop(), 10, []SubPolicyEvalParams{{n1, 20, "eval-1"}, {n2, 20, "eval-2"}}, FakeTimeProvider{}, true)

for i := 1; i <= 10; i++ {
trace := newTraceWithKV(traceID, "test-key", 0)

decision, err := c.Evaluate(context.Background(), traceID, trace)
require.NoError(t, err, "Failed to evaluate composite policy: %v", err)

// The second policy is AlwaysSample, so the decision should be Sampled.
expected := Sampled
assert.Equal(t, expected, decision)
val, ok := trace.ReceivedBatches.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().Get("tailsampling.composite_policy")
assert.True(t, ok, "Did not find expected key")
assert.Equal(t, "eval-2", val.AsString())
}
}

func TestCompositeEvaluatorThrottling(t *testing.T) {
// Create only one subpolicy, with 100% Sampled policy.
n1 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
timeProvider := &FakeTimeProvider{second: 0}
const totalSPS = 10
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS}}, timeProvider)
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS, "eval-1"}}, timeProvider, false)

trace := createTrace()

Expand Down Expand Up @@ -205,7 +245,7 @@ func TestCompositeEvaluator2SubpolicyThrottling(t *testing.T) {
n2 := NewAlwaysSample(componenttest.NewNopTelemetrySettings())
timeProvider := &FakeTimeProvider{second: 0}
const totalSPS = 10
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2}, {n2, totalSPS / 2}}, timeProvider)
c := NewComposite(zap.NewNop(), totalSPS, []SubPolicyEvalParams{{n1, totalSPS / 2, "eval-1"}, {n2, totalSPS / 2, "eval-2"}}, timeProvider, false)

trace := createTrace()

Expand Down
14 changes: 14 additions & 0 deletions processor/tailsamplingprocessor/internal/sampling/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,17 @@ func invertHasInstrumentationLibrarySpanWithCondition(ilss ptrace.ScopeSpansSlic
}
return true
}

func SetAttrOnScopeSpans(data *TraceData, attrName string, attrKey string) {
djluck marked this conversation as resolved.
Show resolved Hide resolved
data.Mutex.Lock()
defer data.Mutex.Unlock()

rs := data.ReceivedBatches.ResourceSpans()
for i := 0; i < rs.Len(); i++ {
rss := rs.At(i)
for j := 0; j < rss.ScopeSpans().Len(); j++ {
ss := rss.ScopeSpans().At(j)
ss.Scope().Attributes().PutStr(attrName, attrKey)
}
}
}
Loading
Loading