Skip to content

Commit

Permalink
Refactor collector pipeline to allow v1/v2 data model (#6484)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #6474

## Description of the changes
- Extend SpanProcessor interface to carry either v1 or v2 spans

## How was this change tested?
- CI

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Jan 5, 2025
1 parent 4024a24 commit b1153a0
Show file tree
Hide file tree
Showing 16 changed files with 289 additions and 97 deletions.
7 changes: 6 additions & 1 deletion cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,12 @@ func TestAggregator(t *testing.T) {
},
},
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
_, err := c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: spans,
Details: processor.Details{
SpanFormat: processor.JaegerSpanFormat,
},
})
require.NoError(t, err)
require.NoError(t, c.Close())

Expand Down
15 changes: 9 additions & 6 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
spanOptions processor.Details // common settings for all spans
tenancyMgr *tenancy.Manager
}

func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.Manager) batchConsumer {
return batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
spanOptions: processor.Details{
InboundTransport: transport,
SpanFormat: spanFormat,
},
Expand All @@ -75,10 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error {
span.Process = batch.Process
}
}
_, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
_, err = c.spanProcessor.ProcessSpans(processor.SpansV1{
Spans: batch.Spans,
Details: processor.Details{
InboundTransport: c.spanOptions.InboundTransport,
SpanFormat: c.spanOptions.SpanFormat,
Tenant: tenant,
},
})
if err != nil {
if errors.Is(err, processor.ErrBusy) {
Expand Down
17 changes: 11 additions & 6 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -33,17 +34,21 @@ type mockSpanProcessor struct {
spanFormat processor.SpanFormat
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))
batch.GetSpans(func(spans []*model.Span) {
p.spans = append(p.spans, spans...)
}, func(_ ptrace.Traces) {
panic("not implemented")
})
oks := make([]bool, len(p.spans))
if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true
p.transport = opts.InboundTransport
p.spanFormat = opts.SpanFormat
p.tenants[batch.GetTenant()] = true
p.transport = batch.GetInboundTransport()
p.spanFormat = batch.GetSpanFormat()
return oks, p.expectedError
}

Expand Down
18 changes: 12 additions & 6 deletions cmd/collector/app/handler/thrift_span_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.JaegerSpanFormat,
},
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
Expand Down Expand Up @@ -105,9 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options
convCount[i] = len(converted)
mSpans = append(mSpans, converted...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{
Spans: mSpans,
Details: processor.Details{
InboundTransport: options.InboundTransport,
SpanFormat: processor.ZipkinSpanFormat,
},
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
Expand Down
14 changes: 11 additions & 3 deletions cmd/collector/app/handler/thrift_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand Down Expand Up @@ -58,12 +59,19 @@ type shouldIErrorProcessor struct {

var errTestError = errors.New("Whoops")

func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) {
func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) {
if s.shouldError {
return nil, errTestError
}
retMe := make([]bool, len(mSpans))
for i := range mSpans {
var spans []*model.Span
batch.GetSpans(func(sp []*model.Span) {
spans = sp
}, func(_ ptrace.Traces) {
panic("not implemented")
})

retMe := make([]bool, len(spans))
for i := range spans {
retMe[i] = true
}
return retMe, nil
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/model_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
package app

import (
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
)

// ProcessSpan processes a Domain Model Span
type ProcessSpan func(span *model.Span, tenant string)

// ProcessSpans processes a batch of Domain Model Spans
type ProcessSpans func(spans []*model.Span, tenant string)
type ProcessSpans func(spans processor.Batch)

// FilterSpan decides whether to allow or disallow a span
type FilterSpan func(span *model.Span) bool
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options {
ret.hostMetrics = metrics.NullFactory
}
if ret.preProcessSpans == nil {
ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {}
ret.preProcessSpans = func(_ processor.Batch) {}
}
if ret.sanitizer == nil {
ret.sanitizer = func(span *model.Span) *model.Span { return span }
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) {
Options.ServiceMetrics(metrics.NullFactory),
Options.Logger(zap.NewNop()),
Options.NumWorkers(5),
Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}),
Options.PreProcessSpans(func(_ processor.Batch) {}),
Options.Sanitizer(func(span *model.Span) *model.Span { return span }),
Options.QueueSize(10),
Options.DynQueueSizeWarmup(1000),
Expand All @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) {
assert.Nil(t, opts.collectorTags)
assert.False(t, opts.reportBusy)
assert.False(t, opts.blockingSubmit)
assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") })
assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) })
assert.NotPanics(t, func() { opts.preSave(nil, "") })
assert.True(t, opts.spanFilter(nil))
span := model.Span{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,11 @@ package processor

import (
"errors"
"io"

"github.com/jaegertracing/jaeger/model"
)

// ErrBusy signalizes that processor cannot process incoming data
var ErrBusy = errors.New("server busy")

// SpansOptions additional options passed to processor along with the spans.
type SpansOptions struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error)
io.Closer
}

// InboundTransport identifies the transport used to receive spans.
type InboundTransport string

Expand Down
File renamed without changes.
71 changes: 71 additions & 0 deletions cmd/collector/app/processor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2020 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package processor

import (
"io"

"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

var (
_ Batch = (*SpansV1)(nil)
_ Batch = (*SpansV2)(nil)
)

// Batch is a batch of spans passed to the processor.
type Batch interface {
// GetSpans delegates to the appropriate function based on the data model version.
GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces))

GetSpanFormat() SpanFormat
GetInboundTransport() InboundTransport
GetTenant() string
}

// SpanProcessor handles spans
type SpanProcessor interface {
// ProcessSpans processes spans and return with either a list of true/false success or an error
ProcessSpans(spans Batch) ([]bool, error)
io.Closer
}

type Details struct {
SpanFormat SpanFormat
InboundTransport InboundTransport
Tenant string
}

// Spans is a batch of spans passed to the processor.
type SpansV1 struct {
Spans []*model.Span
Details
}

type SpansV2 struct {
Traces ptrace.Traces
Details
}

func (s SpansV1) GetSpans(v1 func([]*model.Span), _ func(ptrace.Traces)) {
v1(s.Spans)
}

func (s SpansV2) GetSpans(_ func([]*model.Span), v2 func(ptrace.Traces)) {
v2(s.Traces)
}

func (d Details) GetSpanFormat() SpanFormat {
return d.SpanFormat
}

func (d Details) GetInboundTransport() InboundTransport {
return d.InboundTransport
}

func (d Details) GetTenant() string {
return d.Tenant
}
66 changes: 66 additions & 0 deletions cmd/collector/app/processor/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package processor

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

func TestDetails(t *testing.T) {
d := Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
}
assert.Equal(t, JaegerSpanFormat, d.GetSpanFormat())
assert.Equal(t, GRPCTransport, d.GetInboundTransport())
assert.Equal(t, "tenant", d.GetTenant())
}

func TestSpansV1(t *testing.T) {
s := SpansV1{
Spans: []*model.Span{{}},
Details: Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
},
}
var spans []*model.Span
s.GetSpans(func(s []*model.Span) {
spans = s
}, func(_ ptrace.Traces) {
panic("not implemented")
})
assert.Equal(t, []*model.Span{{}}, spans)
assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat())
assert.Equal(t, GRPCTransport, s.GetInboundTransport())
assert.Equal(t, "tenant", s.GetTenant())
}

func TestSpansV2(t *testing.T) {
s := SpansV2{
Traces: ptrace.NewTraces(),
Details: Details{
SpanFormat: JaegerSpanFormat,
InboundTransport: GRPCTransport,
Tenant: "tenant",
},
}
var traces ptrace.Traces
s.GetSpans(func(_ []*model.Span) {
panic("not implemented")
}, func(t ptrace.Traces) {
traces = t
})
assert.Equal(t, ptrace.NewTraces(), traces)
assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat())
assert.Equal(t, GRPCTransport, s.GetInboundTransport())
assert.Equal(t, "tenant", s.GetTenant())
}
3 changes: 1 addition & 2 deletions cmd/collector/app/server/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

Expand All @@ -27,6 +26,6 @@ func (*mockSpanProcessor) Close() error {
return nil
}

func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) {
func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) {
return []bool{}, nil
}
Loading

0 comments on commit b1153a0

Please sign in to comment.