Skip to content

Commit

Permalink
add-tests
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Jan 7, 2025
1 parent 280cab1 commit 324886e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 63 deletions.
52 changes: 33 additions & 19 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package app

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
Expand All @@ -17,6 +18,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
sanitizerv2 "github.com/jaegertracing/jaeger/cmd/jaeger/sanitizer"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/queue"
"github.com/jaegertracing/jaeger/pkg/telemetry"
Expand All @@ -38,6 +40,7 @@ type spanProcessor struct {
otelExporter exporter.Traces
queueResizeMu sync.Mutex
metrics *SpanProcessorMetrics
telset telemetry.Settings
preProcessSpans ProcessSpans
filterSpan FilterSpan // filter is called before the sanitizer but after preProcessSpans
sanitizer sanitizer.SanitizeSpan // sanitizer is called before processSpan
Expand Down Expand Up @@ -68,13 +71,18 @@ func NewSpanProcessor(
) (processor.SpanProcessor, error) {
sp, err := newSpanProcessor(traceWriter, additional, opts...)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not create span processor: %w", err)
}

sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) {
sp.processItemFromQueue(item)
})

err = sp.otelExporter.Start(context.Background(), sp.telset.Host)
if err != nil {
return nil, fmt.Errorf("could not start exporter: %w", err)
}

sp.background(1*time.Second, sp.updateGauges)

if sp.dynQueueSizeMemory > 0 {
Expand Down Expand Up @@ -106,6 +114,7 @@ func newSpanProcessor(traceWriter tracestore.Writer, additional []ProcessSpan, o
sp := spanProcessor{
queue: boundedQueue,
metrics: handlerMetrics,
telset: telemetry.NoopSettings(), // TODO get real settings
logger: options.logger,
preProcessSpans: options.preProcessSpans,
filterSpan: options.spanFilter,
Expand All @@ -126,23 +135,16 @@ func newSpanProcessor(traceWriter tracestore.Writer, additional []ProcessSpan, o
zap.Uint("queue-size-warmup", options.dynQueueSizeWarmup))
}
if options.dynQueueSizeMemory > 0 || options.spanSizeMetricsEnabled {
// add to processSpanFuncs
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
}

processSpanFuncs = append(processSpanFuncs, additional...)

sp.processSpan = ChainedProcessSpan(processSpanFuncs...)

// TODO get real settings
telset := telemetry.NoopSettings().ToOtelComponent()
set := exporter.Settings{
TelemetrySettings: telset,
}
// surprisingly exporterhelper ignores the config but requires it not be nil
anyCfg := struct{}{}

otelExporter, err := exporterhelper.NewTraces(context.Background(), set, anyCfg,
sp.processSpan = ChainedProcessSpan(append(processSpanFuncs, additional...)...)

otelExporter, err := exporterhelper.NewTraces(
context.Background(),
exporter.Settings{
TelemetrySettings: sp.telset.ToOtelComponent(),
},
struct{}{}, // surprisingly exporterhelper ignores the config but requires it not be nil
sp.pushTraces,
exporterhelper.WithQueue(exporterhelper.NewDefaultQueueConfig()),
// exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
Expand All @@ -153,7 +155,7 @@ func newSpanProcessor(traceWriter tracestore.Writer, additional []ProcessSpan, o
// exporterhelper.WithShutdown(oce.shutdown),
)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not create exporterhelper: %w", err)
}
sp.otelExporter = otelExporter

Expand All @@ -169,8 +171,20 @@ func (sp *spanProcessor) Close() error {

// pushTraces is called by exporterhelper's concurrent queue consumers.
func (sp *spanProcessor) pushTraces(ctx context.Context, td ptrace.Traces) error {
// TODO apply collector tags
// TODO call sanitizers
td = sanitizerv2.Sanitize(td)

if len(sp.collectorTags) > 0 {
for i := 0; i < td.ResourceSpans().Len(); i++ {
resource := td.ResourceSpans().At(i).Resource()
for k, v := range sp.collectorTags {
if _, ok := resource.Attributes().Get(k); ok {
continue // don't override existing keys
}
resource.Attributes().PutStr(k, v)
}
}
}

// TODO emit metrics

return sp.traceWriter.WriteTraces(ctx, td)
Expand Down
125 changes: 81 additions & 44 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
package app

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"reflect"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -168,13 +171,17 @@ func isSpanAllowed(span *model.Span) bool {
}

type fakeSpanWriter struct {
t *testing.T
spansLock sync.Mutex
spans []*model.Span
err error
tenants map[string]bool
}

func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
if n.t != nil {
n.t.Logf("Capturing span %+v", span)
}
n.spansLock.Lock()
defer n.spansLock.Unlock()
n.spans = append(n.spans, span)
Expand Down Expand Up @@ -380,54 +387,84 @@ func TestSpanProcessorWithNilProcess(t *testing.T) {
}

func TestSpanProcessorWithCollectorTags(t *testing.T) {
testCollectorTags := map[string]string{
"extra": "tag",
"env": "prod",
"node": "172.22.18.161",
}
for _, modelVersion := range []string{"v1", "v2"} {
t.Run(modelVersion, func(t *testing.T) {
testCollectorTags := map[string]string{
"extra": "tag",
"env": "prod",
"node": "172.22.18.161",
}

w := &fakeSpanWriter{}
w := &fakeSpanWriter{}

pp, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.CollectorTags(testCollectorTags))
require.NoError(t, err)
p := pp.(*spanProcessor)
pp, err := NewSpanProcessor(
v1adapter.NewTraceWriter(w),
nil,
Options.CollectorTags(testCollectorTags),
Options.NumWorkers(1),
Options.QueueSize(1),
)
require.NoError(t, err)
p := pp.(*spanProcessor)
t.Cleanup(func() {
require.NoError(t, p.Close())
})

defer require.NoError(t, p.Close())
span := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{
{
Key: "env",
VStr: "prod",
},
{
Key: "node",
VStr: "k8s-test-node-01",
},
}),
}
p.addCollectorTags(span)
expected := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{
{
Key: "env",
VStr: "prod",
},
{
Key: "extra",
VStr: "tag",
},
{
Key: "node",
VStr: "172.22.18.161",
},
{
Key: "node",
VStr: "k8s-test-node-01",
},
}),
}
span := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{
model.String("env", "prod"),
model.String("node", "k8s-test-node-01"),
}),
}

assert.Equal(t, expected.Process, span.Process)
var batch processor.Batch
if modelVersion == "v2" {
batch = processor.SpansV2{
Traces: v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}}),
}
} else {
batch = processor.SpansV1{
Spans: []*model.Span{span},
}
}
_, err = p.ProcessSpans(batch)
require.NoError(t, err)

require.Eventually(t, func() bool {
w.spansLock.Lock()
defer w.spansLock.Unlock()
return len(w.spans) > 0
}, time.Second, time.Millisecond)

w.spansLock.Lock()
defer w.spansLock.Unlock()
span = w.spans[0]

expected := &model.Span{
Process: model.NewProcess("unit-test-service", []model.KeyValue{
model.String("env", "prod"),
model.String("extra", "tag"),
model.String("node", "172.22.18.161"),
model.String("node", "k8s-test-node-01"),
}),
}
if modelVersion == "v2" {
// ptrace.Resource.Attributes do not allow duplicate keys,
// so we only add non-conflicting tags, meaning the node IP
// tag from the collectorTags will not be added.
expected.Process.Tags = slices.Delete(expected.Process.Tags, 2, 3)
typedTags := model.KeyValues(span.Process.Tags)
typedTags.Sort()
}

m := &jsonpb.Marshaler{Indent: " "}
jsonActual := new(bytes.Buffer)
m.Marshal(jsonActual, span.Process)
jsonExpected := new(bytes.Buffer)
m.Marshal(jsonExpected, expected.Process)
assert.Equal(t, jsonExpected.String(), jsonActual.String())
})
}
}

func TestSpanProcessorCountSpan(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions cmd/jaeger/sanitizer/sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package sanitizer

import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/sanitizer"
)

// Sanitize is a function that applies all sanitizers to the given trace data.
var Sanitize = sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...)

0 comments on commit 324886e

Please sign in to comment.