From b1153a0b8df90051c20e0bbcc66d56d9b4d476b9 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sun, 5 Jan 2025 17:26:58 -0400 Subject: [PATCH] Refactor collector pipeline to allow v1/v2 data model (#6484) ## Which problem is this PR solving? - Part of #6474 ## Description of the changes - Extend SpanProcessor interface to carry either v1 or v2 spans ## How was this change tested? - CI --------- Signed-off-by: Yuri Shkuro --- cmd/collector/app/collector_test.go | 7 +- cmd/collector/app/handler/grpc_handler.go | 15 ++- .../app/handler/grpc_handler_test.go | 17 ++- .../app/handler/thrift_span_handler.go | 18 ++- .../app/handler/thrift_span_handler_test.go | 14 +- cmd/collector/app/model_consumer.go | 3 +- cmd/collector/app/options.go | 2 +- cmd/collector/app/options_test.go | 4 +- .../processor/{interface.go => constants.go} | 17 --- .../{empty_test.go => package_test.go} | 0 cmd/collector/app/processor/processor.go | 71 ++++++++++ cmd/collector/app/processor/processor_test.go | 66 ++++++++++ cmd/collector/app/server/test.go | 3 +- cmd/collector/app/span_processor.go | 23 +++- cmd/collector/app/span_processor_test.go | 124 +++++++++++------- .../internal/integration/trace_writer.go | 2 +- 16 files changed, 289 insertions(+), 97 deletions(-) rename cmd/collector/app/processor/{interface.go => constants.go} (70%) rename cmd/collector/app/processor/{empty_test.go => package_test.go} (100%) create mode 100644 cmd/collector/app/processor/processor.go create mode 100644 cmd/collector/app/processor/processor_test.go diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 4211f88cbbe..4e42109a2e0 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -262,7 +262,12 @@ func TestAggregator(t *testing.T) { }, }, } - _, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + _, err := c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: spans, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) require.NoError(t, c.Close()) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 853bd899f11..f88343a3de1 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -47,7 +47,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor - spanOptions processor.SpansOptions + spanOptions processor.Details // common settings for all spans tenancyMgr *tenancy.Manager } @@ -55,7 +55,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, return batchConsumer{ logger: logger, spanProcessor: spanProcessor, - spanOptions: processor.SpansOptions{ + spanOptions: processor.Details{ InboundTransport: transport, SpanFormat: spanFormat, }, @@ -75,10 +75,13 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { span.Process = batch.Process } } - _, err = c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{ - InboundTransport: c.spanOptions.InboundTransport, - SpanFormat: c.spanOptions.SpanFormat, - Tenant: tenant, + _, err = c.spanProcessor.ProcessSpans(processor.SpansV1{ + Spans: batch.Spans, + Details: processor.Details{ + InboundTransport: c.spanOptions.InboundTransport, + SpanFormat: c.spanOptions.SpanFormat, + Tenant: tenant, + }, }) if err != nil { if errors.Is(err, processor.ErrBusy) { diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 2501ced7461..e7940e916ed 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -33,17 +34,21 @@ type mockSpanProcessor struct { spanFormat processor.SpanFormat } -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() - p.spans = append(p.spans, spans...) - oks := make([]bool, len(spans)) + batch.GetSpans(func(spans []*model.Span) { + p.spans = append(p.spans, spans...) + }, func(_ ptrace.Traces) { + panic("not implemented") + }) + oks := make([]bool, len(p.spans)) if p.tenants == nil { p.tenants = make(map[string]bool) } - p.tenants[opts.Tenant] = true - p.transport = opts.InboundTransport - p.spanFormat = opts.SpanFormat + p.tenants[batch.GetTenant()] = true + p.transport = batch.GetInboundTransport() + p.spanFormat = batch.GetSpanFormat() return oks, p.expectedError } diff --git a/cmd/collector/app/handler/thrift_span_handler.go b/cmd/collector/app/handler/thrift_span_handler.go index b6940b4bc3c..24f4061d1c2 100644 --- a/cmd/collector/app/handler/thrift_span_handler.go +++ b/cmd/collector/app/handler/thrift_span_handler.go @@ -54,9 +54,12 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ - InboundTransport: options.InboundTransport, - SpanFormat: processor.JaegerSpanFormat, + oks, err := jbh.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.JaegerSpanFormat, + }, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -105,9 +108,12 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options convCount[i] = len(converted) mSpans = append(mSpans, converted...) } - bools, err := h.modelProcessor.ProcessSpans(mSpans, processor.SpansOptions{ - InboundTransport: options.InboundTransport, - SpanFormat: processor.ZipkinSpanFormat, + bools, err := h.modelProcessor.ProcessSpans(processor.SpansV1{ + Spans: mSpans, + Details: processor.Details{ + InboundTransport: options.InboundTransport, + SpanFormat: processor.ZipkinSpanFormat, + }, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/handler/thrift_span_handler_test.go b/cmd/collector/app/handler/thrift_span_handler_test.go index 512944fc284..16e5f5fe7c2 100644 --- a/cmd/collector/app/handler/thrift_span_handler_test.go +++ b/cmd/collector/app/handler/thrift_span_handler_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" @@ -58,12 +59,19 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ processor.SpansOptions) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { if s.shouldError { return nil, errTestError } - retMe := make([]bool, len(mSpans)) - for i := range mSpans { + var spans []*model.Span + batch.GetSpans(func(sp []*model.Span) { + spans = sp + }, func(_ ptrace.Traces) { + panic("not implemented") + }) + + retMe := make([]bool, len(spans)) + for i := range spans { retMe[i] = true } return retMe, nil diff --git a/cmd/collector/app/model_consumer.go b/cmd/collector/app/model_consumer.go index a4a7636982e..e94377d49ae 100644 --- a/cmd/collector/app/model_consumer.go +++ b/cmd/collector/app/model_consumer.go @@ -5,6 +5,7 @@ package app import ( + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" ) @@ -12,7 +13,7 @@ import ( type ProcessSpan func(span *model.Span, tenant string) // ProcessSpans processes a batch of Domain Model Spans -type ProcessSpans func(spans []*model.Span, tenant string) +type ProcessSpans func(spans processor.Batch) // FilterSpan decides whether to allow or disallow a span type FilterSpan func(span *model.Span) bool diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 03f4eb8c6f7..3d8f745ae50 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -176,7 +176,7 @@ func (options) apply(opts ...Option) options { ret.hostMetrics = metrics.NullFactory } if ret.preProcessSpans == nil { - ret.preProcessSpans = func(_ []*model.Span, _ /* tenant */ string) {} + ret.preProcessSpans = func(_ processor.Batch) {} } if ret.sanitizer == nil { ret.sanitizer = func(span *model.Span) *model.Span { return span } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 94f388920ae..1bd4791e8b2 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -27,7 +27,7 @@ func TestAllOptionSet(t *testing.T) { Options.ServiceMetrics(metrics.NullFactory), Options.Logger(zap.NewNop()), Options.NumWorkers(5), - Options.PreProcessSpans(func(_ []*model.Span, _ /* tenant */ string) {}), + Options.PreProcessSpans(func(_ processor.Batch) {}), Options.Sanitizer(func(span *model.Span) *model.Span { return span }), Options.QueueSize(10), Options.DynQueueSizeWarmup(1000), @@ -53,7 +53,7 @@ func TestNoOptionsSet(t *testing.T) { assert.Nil(t, opts.collectorTags) assert.False(t, opts.reportBusy) assert.False(t, opts.blockingSubmit) - assert.NotPanics(t, func() { opts.preProcessSpans(nil, "") }) + assert.NotPanics(t, func() { opts.preProcessSpans(processor.SpansV1{}) }) assert.NotPanics(t, func() { opts.preSave(nil, "") }) assert.True(t, opts.spanFilter(nil)) span := model.Span{} diff --git a/cmd/collector/app/processor/interface.go b/cmd/collector/app/processor/constants.go similarity index 70% rename from cmd/collector/app/processor/interface.go rename to cmd/collector/app/processor/constants.go index 03fe67c0b57..e639bc5a539 100644 --- a/cmd/collector/app/processor/interface.go +++ b/cmd/collector/app/processor/constants.go @@ -5,28 +5,11 @@ package processor import ( "errors" - "io" - - "github.com/jaegertracing/jaeger/model" ) // ErrBusy signalizes that processor cannot process incoming data var ErrBusy = errors.New("server busy") -// SpansOptions additional options passed to processor along with the spans. -type SpansOptions struct { - SpanFormat SpanFormat - InboundTransport InboundTransport - Tenant string -} - -// SpanProcessor handles model spans -type SpanProcessor interface { - // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(mSpans []*model.Span, options SpansOptions) ([]bool, error) - io.Closer -} - // InboundTransport identifies the transport used to receive spans. type InboundTransport string diff --git a/cmd/collector/app/processor/empty_test.go b/cmd/collector/app/processor/package_test.go similarity index 100% rename from cmd/collector/app/processor/empty_test.go rename to cmd/collector/app/processor/package_test.go diff --git a/cmd/collector/app/processor/processor.go b/cmd/collector/app/processor/processor.go new file mode 100644 index 00000000000..363a27d20de --- /dev/null +++ b/cmd/collector/app/processor/processor.go @@ -0,0 +1,71 @@ +// Copyright (c) 2020 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "io" + + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +var ( + _ Batch = (*SpansV1)(nil) + _ Batch = (*SpansV2)(nil) +) + +// Batch is a batch of spans passed to the processor. +type Batch interface { + // GetSpans delegates to the appropriate function based on the data model version. + GetSpans(v1 func(spans []*model.Span), v2 func(traces ptrace.Traces)) + + GetSpanFormat() SpanFormat + GetInboundTransport() InboundTransport + GetTenant() string +} + +// SpanProcessor handles spans +type SpanProcessor interface { + // ProcessSpans processes spans and return with either a list of true/false success or an error + ProcessSpans(spans Batch) ([]bool, error) + io.Closer +} + +type Details struct { + SpanFormat SpanFormat + InboundTransport InboundTransport + Tenant string +} + +// Spans is a batch of spans passed to the processor. +type SpansV1 struct { + Spans []*model.Span + Details +} + +type SpansV2 struct { + Traces ptrace.Traces + Details +} + +func (s SpansV1) GetSpans(v1 func([]*model.Span), _ func(ptrace.Traces)) { + v1(s.Spans) +} + +func (s SpansV2) GetSpans(_ func([]*model.Span), v2 func(ptrace.Traces)) { + v2(s.Traces) +} + +func (d Details) GetSpanFormat() SpanFormat { + return d.SpanFormat +} + +func (d Details) GetInboundTransport() InboundTransport { + return d.InboundTransport +} + +func (d Details) GetTenant() string { + return d.Tenant +} diff --git a/cmd/collector/app/processor/processor_test.go b/cmd/collector/app/processor/processor_test.go new file mode 100644 index 00000000000..d187a23c96e --- /dev/null +++ b/cmd/collector/app/processor/processor_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package processor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +func TestDetails(t *testing.T) { + d := Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: GRPCTransport, + Tenant: "tenant", + } + assert.Equal(t, JaegerSpanFormat, d.GetSpanFormat()) + assert.Equal(t, GRPCTransport, d.GetInboundTransport()) + assert.Equal(t, "tenant", d.GetTenant()) +} + +func TestSpansV1(t *testing.T) { + s := SpansV1{ + Spans: []*model.Span{{}}, + Details: Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: GRPCTransport, + Tenant: "tenant", + }, + } + var spans []*model.Span + s.GetSpans(func(s []*model.Span) { + spans = s + }, func(_ ptrace.Traces) { + panic("not implemented") + }) + assert.Equal(t, []*model.Span{{}}, spans) + assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) + assert.Equal(t, GRPCTransport, s.GetInboundTransport()) + assert.Equal(t, "tenant", s.GetTenant()) +} + +func TestSpansV2(t *testing.T) { + s := SpansV2{ + Traces: ptrace.NewTraces(), + Details: Details{ + SpanFormat: JaegerSpanFormat, + InboundTransport: GRPCTransport, + Tenant: "tenant", + }, + } + var traces ptrace.Traces + s.GetSpans(func(_ []*model.Span) { + panic("not implemented") + }, func(t ptrace.Traces) { + traces = t + }) + assert.Equal(t, ptrace.NewTraces(), traces) + assert.Equal(t, JaegerSpanFormat, s.GetSpanFormat()) + assert.Equal(t, GRPCTransport, s.GetInboundTransport()) + assert.Equal(t, "tenant", s.GetTenant()) +} diff --git a/cmd/collector/app/server/test.go b/cmd/collector/app/server/test.go index 57dbebe48a4..a047191f6b1 100644 --- a/cmd/collector/app/server/test.go +++ b/cmd/collector/app/server/test.go @@ -7,7 +7,6 @@ import ( "context" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" - "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -27,6 +26,6 @@ func (*mockSpanProcessor) Close() error { return nil } -func (*mockSpanProcessor) ProcessSpans([]*model.Span, processor.SpansOptions) ([]bool, error) { +func (*mockSpanProcessor) ProcessSpans(_ processor.Batch) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 8377abb363a..a3541cd3c6c 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" @@ -164,22 +165,29 @@ func (sp *spanProcessor) countSpan(span *model.Span, _ string /* tenant */) { sp.spansProcessed.Add(1) } -func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.SpansOptions) ([]bool, error) { - sp.preProcessSpans(mSpans, options.Tenant) - sp.metrics.BatchSize.Update(int64(len(mSpans))) - retMe := make([]bool, len(mSpans)) +func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { + sp.preProcessSpans(batch) + var spans []*model.Span + batch.GetSpans(func(spansV1 []*model.Span) { + spans = spansV1 + }, func(_ ptrace.Traces) { + panic("not implemented") + }) + sp.metrics.BatchSize.Update(int64(len(spans))) + retMe := make([]bool, len(spans)) // Note: this is not the ideal place to do this because collector tags are added to Process.Tags, // and Process can be shared between different spans in the batch, but we no longer know that, // the relation is lost upstream and it's impossible in Go to dedupe pointers. But at least here // we have a single thread updating all spans that may share the same Process, before concurrency // kicks in. - for _, span := range mSpans { + for _, span := range spans { sp.addCollectorTags(span) } - for i, mSpan := range mSpans { - ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport, options.Tenant) + for i, mSpan := range spans { + // TODO does this have to be one span at a time? + ok := sp.enqueueSpan(mSpan, batch.GetSpanFormat(), batch.GetInboundTransport(), batch.GetTenant()) if !ok && sp.reportBusy { return nil, processor.ErrBusy } @@ -189,6 +197,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options processor.Sp } func (sp *spanProcessor) processItemFromQueue(item *queueItem) { + // TODO calling sanitizer here contradicts the comment in enqueueSpan about immutable Process. sp.processSpan(sp.sanitizer(item.span), item.tenant) sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index c57f73b4bd8..f52494098e9 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -238,8 +238,12 @@ func TestSpanProcessor(t *testing.T) { p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) res, err := p.ProcessSpans( - []*model.Span{{}}, // empty span should be enriched by sanitizers - processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + processor.SpansV1{ + Spans: []*model.Span{{}}, // empty span should be enriched by sanitizers + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -263,13 +267,18 @@ func TestSpanProcessorErrors(t *testing.T) { Options.QueueSize(1), ).(*spanProcessor) - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -315,23 +324,28 @@ func TestSpanProcessorBusy(t *testing.T) { w.Lock() defer w.Unlock() - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, - }, - { - Process: &model.Process{ - ServiceName: "x", + { + Process: &model.Process{ + ServiceName: "x", + }, }, - }, - { - Process: &model.Process{ - ServiceName: "x", + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.Error(t, err, "expecting busy error") assert.Nil(t, res) @@ -612,13 +626,18 @@ func TestAdditionalProcessors(t *testing.T) { // nil doesn't fail p := NewSpanProcessor(w, nil, Options.QueueSize(1)) - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -629,13 +648,18 @@ func TestAdditionalProcessors(t *testing.T) { count++ } p = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) - res, err = p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) require.NoError(t, p.Close()) @@ -648,14 +672,17 @@ func TestSpanProcessorContextPropagation(t *testing.T) { dummyTenant := "context-prop-test-tenant" - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", + res, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + { + Process: &model.Process{ + ServiceName: "x", + }, }, }, - }, processor.SpansOptions{ - Tenant: dummyTenant, + Details: processor.Details{ + Tenant: dummyTenant, + }, }) require.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -687,10 +714,14 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { w.Lock() defer w.Unlock() - opts := processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat} - _, err := p.ProcessSpans([]*model.Span{ - {OperationName: "op1"}, - }, opts) + _, err := p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + {OperationName: "op1"}, + }, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.NoError(t, err) // Wait for the sole worker to pick the item from the queue and block @@ -700,10 +731,15 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { // Now the queue is empty again and can accept one more item, but no workers available. // If we send two items, the last one will have to be dropped. - _, err = p.ProcessSpans([]*model.Span{ - {OperationName: "op2"}, - {OperationName: "op3"}, - }, opts) + _, err = p.ProcessSpans(processor.SpansV1{ + Spans: []*model.Span{ + {OperationName: "op2"}, + {OperationName: "op3"}, + }, + Details: processor.Details{ + SpanFormat: processor.JaegerSpanFormat, + }, + }) require.EqualError(t, err, processor.ErrBusy.Error()) assert.Equal(t, []string{"op3"}, droppedOperations) } diff --git a/cmd/jaeger/internal/integration/trace_writer.go b/cmd/jaeger/internal/integration/trace_writer.go index c5286aa44d0..999ce34fb4d 100644 --- a/cmd/jaeger/internal/integration/trace_writer.go +++ b/cmd/jaeger/internal/integration/trace_writer.go @@ -81,7 +81,7 @@ func (w *traceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { scope ptrace.ScopeSpans resource ptrace.ResourceSpans ) - + if spanCount == MaxChunkSize { err = w.exporter.ConsumeTraces(ctx, currentChunk) currentChunk = ptrace.NewTraces()