diff --git a/.github/workflows/ci-e2e-kafka.yml b/.github/workflows/ci-e2e-kafka.yml index d56d00468036..c79af13d1bcf 100644 --- a/.github/workflows/ci-e2e-kafka.yml +++ b/.github/workflows/ci-e2e-kafka.yml @@ -17,8 +17,9 @@ jobs: strategy: fail-fast: false matrix: - jaeger-version: [v1, v2] # Adjust if there are specific versions of Jaeger - name: kafka ${{ matrix.jaeger-version }} + jaeger-version: [v1, v2] + kafka-version: ["3.x", "2.x"] + name: kafka ${{matrix.kafka-version }} ${{ matrix.jaeger-version }} steps: - name: Harden Runner uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1 @@ -31,12 +32,12 @@ jobs: with: go-version: 1.23.x - - name: Run Kafka integration tests + - name: Run kafka integration tests id: test-execution - run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }} + run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }} -v ${{ matrix.kafka-version }} - name: Upload coverage to codecov uses: ./.github/actions/upload-codecov with: files: cover.out - flags: kafka-${{ matrix.jaeger-version }} + flags: kafka-${{ matrix.kafka-version }}-${{ matrix.jaeger-version }} diff --git a/.golangci.yml b/.golangci.yml index 5427041391e6..bac7db011bda 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -139,6 +139,21 @@ linters-settings: files: - "**_test.go" + disallow-otel-contrib-translator: + deny: + - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger + desc: "Use v1adapter package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" + files: + - "!**/v1adapter/**" + + # TODO: remove once we have upgraded to Go 1.23 + disallow-iter: + deny: + - pkg: iter + desc: "Use github.com/jaegertracing/jaeger/pkg/iter" + files: + - "**" + goimports: local-prefixes: github.com/jaegertracing/jaeger gosec: diff --git a/README.md b/README.md index 783edaeb5999..b843db4fbf3d 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,19 @@ Jaeger is an open source project with open governance. We welcome contributions ## Version Compatibility Guarantees -Occasionally, CLI flags can be deprecated due to, for example, usability improvements or new functionality. +Since Jaeger uses many components from the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector/) we try to maintain configuration compatibility between Jaeger releases. Occasionally, configuration options in Jaeger (or in Jaeger v1 CLI flags) can be deprecated due to usability improvements, new functionality, or changes in our dependencies. In such situations, developers introducing the deprecation are required to follow [these guidelines](./CONTRIBUTING.md#deprecating-cli-flags). -In short, for a deprecated CLI flag, you should expect to see the following message in the `--help` documentation: +In short, for a deprecated configuration option, you should expect to see the following message in the documentation or release notes: ``` (deprecated, will be removed after yyyy-mm-dd or in release vX.Y.Z, whichever is later) ``` A grace period of at least **3 months** or **two minor version bumps** (whichever is later) from the first release -containing the deprecation notice will be provided before the deprecated CLI flag _can_ be deleted. +containing the deprecation notice will be provided before the deprecated configuration option _can_ be deleted. -For example, consider a scenario where v1.28.0 is released on 01-Jun-2021 containing a deprecation notice for a CLI flag. -This flag will remain in a deprecated state until the later of 01-Sep-2021 or v1.30.0 where it _can_ be removed on or after either of those events. +For example, consider a scenario where v2.0.0 is released on 01-Sep-2024 containing a deprecation notice for a configuration option. +This configuration option will remain in a deprecated state until the later of 01-Dec-2024 or v2.2.0 where it _can_ be removed on or after either of those events. It may remain deprecated for longer than the aforementioned grace period. ## Go Version Compatibility Guarantees diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index ba450e4a2980..228ad80b666b 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -7,7 +7,6 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confignet" @@ -24,6 +23,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/pkg/tenancy" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var _ component.Host = (*otelHost)(nil) // API check @@ -108,7 +108,7 @@ type consumerDelegate struct { } func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := v1adapter.ProtoFromTraces(td) for _, batch := range batches { err := c.batchConsumer.consume(ctx, batch) if err != nil { diff --git a/cmd/jaeger/internal/all-in-one.yaml b/cmd/jaeger/internal/all-in-one.yaml index 65a1d84e60c0..2f1ab3145608 100644 --- a/cmd/jaeger/internal/all-in-one.yaml +++ b/cmd/jaeger/internal/all-in-one.yaml @@ -62,6 +62,10 @@ receivers: endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:14250" thrift_http: endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:14268" + thrift_binary: + endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:6832" + thrift_compact: + endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:6831" zipkin: endpoint: "${env:JAEGER_LISTEN_HOST:-localhost}:9411" diff --git a/cmd/jaeger/internal/integration/span_writer.go b/cmd/jaeger/internal/integration/span_writer.go index da722b1681ff..073eaec65eeb 100644 --- a/cmd/jaeger/internal/integration/span_writer.go +++ b/cmd/jaeger/internal/integration/span_writer.go @@ -9,7 +9,6 @@ import ( "io" "time" - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" @@ -19,6 +18,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var ( @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error { } func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ + td := v1adapter.V1BatchesToTraces([]*model.Batch{ { Spans: []*model.Span{span}, Process: span.Process, }, }) - if err != nil { - return err - } return w.exporter.ConsumeTraces(ctx, td) } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index 4d14f7f99c47..6a42151b1da0 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -7,7 +7,6 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/ptrace" @@ -15,6 +14,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type traceProcessor struct { @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error { } func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := v1adapter.ProtoFromTraces(td) for _, batch := range batches { for _, span := range batch.Spans { if span.Process == nil { diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 1a39ca798452..1d1bba2f4cc9 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -4,15 +4,14 @@ package apiv3 import ( - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) func modelToOTLP(spans []*model.Span) ptrace.Traces { batch := &model.Batch{Spans: spans} - // there is never an error returned from ProtoToTraces - tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + tr := v1adapter.V1BatchesToTraces([]*model.Batch{batch}) return tr } diff --git a/cmd/query/app/otlp_translator.go b/cmd/query/app/otlp_translator.go index 06f7c63e4409..82ed887d3229 100644 --- a/cmd/query/app/otlp_translator.go +++ b/cmd/query/app/otlp_translator.go @@ -6,10 +6,10 @@ package app import ( "fmt" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) { @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err) } - jaegerBatches := model2otel.ProtoFromTraces(otlpTraces) + jaegerBatches := v1adapter.ProtoFromTraces(otlpTraces) var traces []*model.Trace traceMap := make(map[model.TraceID]*model.Trace) for _, batch := range jaegerBatches { diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go new file mode 100644 index 000000000000..c3ea335aa2f9 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -0,0 +1,207 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "fmt" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/pkg/otelsemconv" +) + +const ( + warningDuplicateSpanID = "duplicate span IDs; skipping clock skew adjustment" + warningMissingParentSpanID = "parent span ID=%s is not in the trace; skipping clock skew adjustment" + warningMaxDeltaExceeded = "max clock skew adjustment delta of %v exceeded; not applying calculated delta of %v" + warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v" +) + +// CorrectClockSkew returns an Adjuster that corrects span timestamps for clock skew. +// +// This adjuster modifies the start and log timestamps of child spans that are +// inconsistent with their parent spans due to clock differences between hosts. +// It assumes all spans have unique IDs and should be used after SpanIDUniquifier. +// +// The adjuster determines if two spans belong to the same source by deriving a +// unique string representation of a host based on resource attributes, +// such as `host.id`, `host.ip`, or `host.name`. +// If two spans have the same host key, they are considered to be from +// the same source, and no clock skew adjustment is expected between them. +// +// Parameters: +// - maxDelta: The maximum allowable time adjustment. Adjustments exceeding +// this value will be ignored. +func CorrectClockSkew(maxDelta time.Duration) Adjuster { + return Func(func(traces ptrace.Traces) { + adjuster := &clockSkewAdjuster{ + traces: traces, + maxDelta: maxDelta, + } + adjuster.buildNodesMap() + adjuster.buildSubGraphs() + for _, root := range adjuster.roots { + skew := clockSkew{hostKey: root.hostKey} + adjuster.adjustNode(root, nil, skew) + } + }) +} + +type clockSkewAdjuster struct { + traces ptrace.Traces + maxDelta time.Duration + spans map[pcommon.SpanID]*node + roots map[pcommon.SpanID]*node +} + +type clockSkew struct { + delta time.Duration + hostKey string +} + +type node struct { + span ptrace.Span + children []*node + hostKey string +} + +// hostKey derives a unique string representation of a host based on resource attributes. +// This is used to determine if two spans are from the same host. +func hostKey(resource ptrace.ResourceSpans) string { + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIDKey)); ok { + return attr.Str() + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIPKey)); ok { + if attr.Type() == pcommon.ValueTypeStr { + return attr.Str() + } else if attr.Type() == pcommon.ValueTypeSlice { + ips := attr.Slice() + if ips.Len() > 0 { + return ips.At(0).AsString() + } + } + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostNameKey)); ok { + return attr.Str() + } + return "" +} + +// buildNodesMap creates a mapping of span IDs to their corresponding nodes. +func (a *clockSkewAdjuster) buildNodesMap() { + a.spans = make(map[pcommon.SpanID]*node) + resources := a.traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + hk := hostKey(resource) + scopes := resource.ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + if _, exists := a.spans[span.SpanID()]; exists { + jptrace.AddWarnings(span, warningDuplicateSpanID) + } else { + a.spans[span.SpanID()] = &node{ + span: span, + hostKey: hk, + } + } + } + } + } +} + +// finds all spans that have no parent, i.e. where parentID is either 0 +// or points to an ID for which there is no span. +func (a *clockSkewAdjuster) buildSubGraphs() { + a.roots = make(map[pcommon.SpanID]*node) + for _, n := range a.spans { + if n.span.ParentSpanID() == pcommon.NewSpanIDEmpty() { + a.roots[n.span.SpanID()] = n + continue + } + if p, ok := a.spans[n.span.ParentSpanID()]; ok { + p.children = append(p.children, n) + } else { + warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID()) + jptrace.AddWarnings(n.span, warning) + // treat spans with invalid parent ID as root spans + a.roots[n.span.SpanID()] = n + } + } +} + +func (a *clockSkewAdjuster) adjustNode(n *node, parent *node, skew clockSkew) { + if (n.hostKey != skew.hostKey || n.hostKey == "") && parent != nil { + // Node n is from a different host. The parent has already been adjusted, + // so we can compare this node's timestamps against the parent. + skew = clockSkew{ + hostKey: n.hostKey, + delta: a.calculateSkew(n, parent), + } + } + a.adjustTimestamps(n, skew) + for _, child := range n.children { + a.adjustNode(child, n, skew) + } +} + +func (*clockSkewAdjuster) calculateSkew(child *node, parent *node) time.Duration { + parentStartTime := parent.span.StartTimestamp().AsTime() + childStartTime := child.span.StartTimestamp().AsTime() + parentEndTime := parent.span.EndTimestamp().AsTime() + childEndTime := child.span.EndTimestamp().AsTime() + parentDuration := parentEndTime.Sub(parentStartTime) + childDuration := childEndTime.Sub(childStartTime) + + if childDuration > parentDuration { + // When the child lasted longer than the parent, it was either + // async or the parent may have timed out before child responded. + // The only reasonable adjustment we can do in this case is to make + // sure the child does not start before parent. + if childStartTime.Before(parentStartTime) { + return parentStartTime.Sub(childStartTime) + } + return 0 + } + if !childStartTime.Before(parentStartTime) && !childEndTime.After(parentEndTime) { + // child already fits within the parent span, do not adjust + return 0 + } + // Assume that network latency is equally split between req and res. + latency := (parentDuration - childDuration) / 2 + // Goal: parentStartTime + latency = childStartTime + adjustment + return parentStartTime.Add(latency).Sub(childStartTime) +} + +func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { + if skew.delta == 0 { + return + } + if absDuration(skew.delta) > a.maxDelta { + if a.maxDelta == 0 { + jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + return + } + jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + return + } + n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta))) + jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) + for i := 0; i < n.span.Events().Len(); i++ { + event := n.span.Events().At(i) + event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta))) + } +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -1 * d + } + return d +} diff --git a/cmd/query/app/querysvc/adjuster/clockskew_test.go b/cmd/query/app/querysvc/adjuster/clockskew_test.go new file mode 100644 index 000000000000..cfa94e0c292b --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew_test.go @@ -0,0 +1,292 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" +) + +func TestClockSkewAdjuster(t *testing.T) { + type testSpan struct { + id, parent [8]byte + startTime, duration int + events []int // timestamps for logs + host string + adjusted int // start time after adjustment + adjustedEvents []int // adjusted log timestamps + } + + toTime := func(t int) time.Time { + return time.Unix(0, (time.Duration(t) * time.Millisecond).Nanoseconds()) + } + + // helper function that constructs a trace from a list of span prototypes + makeTrace := func(spanPrototypes []testSpan) ptrace.Traces { + trace := ptrace.NewTraces() + for _, spanProto := range spanPrototypes { + traceID := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1}) + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanProto.id) + span.SetParentSpanID(spanProto.parent) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime + spanProto.duration))) + + events := ptrace.NewSpanEventSlice() + for _, log := range spanProto.events { + event := events.AppendEmpty() + event.SetTimestamp(pcommon.NewTimestampFromTime(toTime(log))) + event.Attributes().PutStr("event", "some event") + } + events.CopyTo(span.Events()) + + resource := ptrace.NewResourceSpans() + resource.Resource().Attributes().PutEmptySlice("host.ip").AppendEmpty().SetStr(spanProto.host) + + span.CopyTo(resource.ScopeSpans().AppendEmpty().Spans().AppendEmpty()) + resource.CopyTo(trace.ResourceSpans().AppendEmpty()) + } + return trace + } + + testCases := []struct { + description string + trace []testSpan + err string + maxAdjust time.Duration + }{ + { + description: "single span with bad parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0, 0, 0, 0, 0, 0, 0, 99}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "parent span ID=0000000000000063 is not in the trace; skipping clock skew adjustment", // 99 == 0x63 + }, + { + description: "single span with empty host key", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, adjusted: 0}, + }, + }, + { + description: "two spans with the same ID", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "duplicate span IDs; skipping clock skew adjustment", + }, + { + description: "parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 10, duration: 50, host: "a", adjusted: 10}, + }, + }, + { + description: "do not adjust parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "a", adjusted: 0}, + }, + }, + { + description: "do not adjust child that fits inside parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 50, host: "b", adjusted: 20}, + }, + }, + { + description: "do not adjust child that is longer than parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 150, host: "b", adjusted: 20}, + }, + }, + { + description: "do not apply positive adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of 35ms", + }, + { + description: "do not apply negative adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 80, duration: 50, host: "b", adjusted: 80}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of -45ms", + }, + { + description: "do not apply adjustment due to disabled adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + err: "clock skew adjustment disabled; not applying calculated delta of 35ms", + }, + { + description: "adjust child starting before parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency = (100-50) / 2 = 25 + // delta = (10 - 0) + latency = 35 + { + id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 35, + events: []int{5, 10}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child starting before parent even if it is longer", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 150, host: "b", adjusted: 10}, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child ending after parent but being shorter", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency: (100 - 70) / 2 = 15 + // new child start time: 10 + latency = 25, delta = -25 + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 50, duration: 70, host: "b", adjusted: 25}, + // same host 'b', so same delta = -25 + // new start time: 60 + delta = 35 + { + id: [8]byte{3}, parent: [8]byte{2}, startTime: 60, duration: 20, host: "b", adjusted: 35, + events: []int{65, 70}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + } + + for _, tt := range testCases { + testCase := tt // capture loop var + t.Run(testCase.description, func(t *testing.T) { + trace := makeTrace(testCase.trace) + adjuster := CorrectClockSkew(tt.maxAdjust) + adjuster.Adjust(trace) + + var gotErr string + for i, proto := range testCase.trace { + id := proto.id + span := trace.ResourceSpans().At(i).ScopeSpans().At(0).Spans().At(0) + require.EqualValues(t, proto.id, span.SpanID(), "expecting span with span ID = %d", id) + + warnings := jptrace.GetWarnings(span) + if testCase.err == "" { + if proto.adjusted == proto.startTime { + assert.Empty(t, warnings, "no warnings in span %s", span.SpanID) + } else { + assert.Len(t, warnings, 1, "warning about adjutment added to span %s", span.SpanID) + } + } else { + if len(warnings) > 0 { + gotErr = warnings[0] + } + } + + // compare values as int because assert.Equal prints uint64 as hex + assert.Equal( + t, toTime(proto.adjusted).UTC(), span.StartTimestamp().AsTime(), + "adjusted start time of span[ID = %d]", id) + for i, logTs := range proto.adjustedEvents { + assert.Equal( + t, toTime(logTs).UTC(), span.Events().At(i).Timestamp().AsTime(), + "adjusted log timestamp of span[ID = %d], log[%d]", id, i) + } + } + assert.Equal(t, testCase.err, gotErr) + }) + } +} + +func TestHostKey(t *testing.T) { + tests := []struct { + name string + resource ptrace.ResourceSpans + expected string + }{ + { + name: "host.id attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.id", "host-123") + return rs + }(), + expected: "host-123", + }, + { + name: "string host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.ip", "192.168.1.1") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "slice host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + addresses := rs.Resource().Attributes().PutEmptySlice("host.ip") + addresses.AppendEmpty().SetStr("192.168.1.1") + addresses.AppendEmpty().SetStr("192.168.1.2") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "empty host.ip attribute slice", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutEmptySlice("host.ip") + return rs + }(), + expected: "", + }, + { + name: "host.name attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.name", "hostname") + return rs + }(), + expected: "hostname", + }, + { + name: "no relevant attributes", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("service.name", "service-123") + return rs + }(), + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := hostKey(tt.resource) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go index 25afbbc1621b..458258c7c3c1 100644 --- a/cmd/query/app/querysvc/adjuster/hash.go +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -14,7 +14,7 @@ import ( var _ Adjuster = (*SpanHashDeduper)(nil) -// SpanHash creates an adjuster that deduplicates spans by removing all but one span +// DeduplicateSpans creates an adjuster that deduplicates spans by removing all but one span // with the same hash code. This is particularly useful for scenarios where spans // may be duplicated during archival, such as with ElasticSearch archival. // @@ -23,8 +23,8 @@ var _ Adjuster = (*SpanHashDeduper)(nil) // // To ensure consistent hash codes, this adjuster should be executed after // SortAttributesAndEvents, which normalizes the order of collections within the span. -func SpanHash() SpanHashDeduper { - return SpanHashDeduper{ +func DeduplicateSpans() *SpanHashDeduper { + return &SpanHashDeduper{ marshaler: &ptrace.ProtoMarshaler{}, } } @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) { hashTrace, ) if err != nil { - jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err)) + jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err)) span.CopyTo(dedupedSpans.AppendEmpty()) continue } diff --git a/cmd/query/app/querysvc/adjuster/hash_test.go b/cmd/query/app/querysvc/adjuster/hash_test.go index dd90d7ab1a37..d5f977954182 100644 --- a/cmd/query/app/querysvc/adjuster/hash_test.go +++ b/cmd/query/app/querysvc/adjuster/hash_test.go @@ -13,7 +13,7 @@ import ( ) func TestSpanHash_EmptySpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := ptrace.NewTraces() expected := ptrace.NewTraces() adjuster.Adjust(input) @@ -21,7 +21,7 @@ func TestSpanHash_EmptySpans(t *testing.T) { } func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() @@ -126,7 +126,7 @@ func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { } func TestSpanHash_NoDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -180,7 +180,7 @@ func TestSpanHash_NoDuplicateSpans(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -234,7 +234,7 @@ func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/ipattribute.go b/cmd/query/app/querysvc/adjuster/ipattribute.go index a00f93b3254d..89c361a0cc3a 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute.go @@ -19,10 +19,10 @@ var ipAttributesToCorrect = map[string]struct{}{ "peer.ipv4": {}, } -// IPAttribute returns an adjuster that replaces numeric "ip" attributes, +// NormalizeIPAttributes returns an adjuster that replaces numeric "ip" attributes, // which usually contain IPv4 packed into uint32, with their string // representation (e.g. "8.8.8.8""). -func IPAttribute() IPAttributeAdjuster { +func NormalizeIPAttributes() IPAttributeAdjuster { return IPAttributeAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/ipattribute_test.go b/cmd/query/app/querysvc/adjuster/ipattribute_test.go index d97f1bf35ede..4e5d09172305 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute_test.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute_test.go @@ -58,7 +58,7 @@ func TestIPAttributeAdjuster(t *testing.T) { } } - IPAttribute().Adjust(traces) + NormalizeIPAttributes().Adjust(traces) resourceSpan := traces.ResourceSpans().At(0) assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len()) diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes.go b/cmd/query/app/querysvc/adjuster/libraryattributes.go similarity index 88% rename from cmd/query/app/querysvc/adjuster/resourceattributes.go rename to cmd/query/app/querysvc/adjuster/libraryattributes.go index 6bedd4ff4e9e..418a1206c29f 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes.go @@ -21,11 +21,11 @@ var libraryKeys = map[string]struct{}{ string(otelsemconv.TelemetryDistroVersionKey): {}, } -// ResourceAttributes creates an adjuster that moves the OpenTelemetry library +// MoveLibraryAttributes creates an adjuster that moves the OpenTelemetry library // attributes from spans to the parent resource so that the UI can // display them separately under Process. // https://github.com/jaegertracing/jaeger/issues/4534 -func ResourceAttributes() ResourceAttributesAdjuster { +func MoveLibraryAttributes() ResourceAttributesAdjuster { return ResourceAttributesAdjuster{} } @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom for k, v := range replace { existing, ok := resource.Attributes().Get(k) if ok && existing.AsRaw() != v.AsRaw() { - jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k) + jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k) continue } v.CopyTo(resource.Attributes().PutEmpty(k)) diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go similarity index 97% rename from cmd/query/app/querysvc/adjuster/resourceattributes_test.go rename to cmd/query/app/querysvc/adjuster/libraryattributes_test.go index 591820db4a26..ebcf8801080b 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go @@ -25,7 +25,7 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) { span.Attributes().PutStr(string(otelsemconv.TelemetryDistroVersionKey), "blah") span.Attributes().PutStr("another_key", "another_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -67,7 +67,7 @@ func TestResourceAttributesAdjuster_SpanWithoutLibraryAttributes(t *testing.T) { span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() span.Attributes().PutStr("random_key", "random_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -85,7 +85,7 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Java") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) @@ -119,7 +119,7 @@ func TestResourceAttributesAdjuster_SpanWithNonConflictingLibraryAttributes(t *t span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Go") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() diff --git a/cmd/query/app/querysvc/adjuster/sort.go b/cmd/query/app/querysvc/adjuster/sort.go index 0ed38af3286b..96f50de4a499 100644 --- a/cmd/query/app/querysvc/adjuster/sort.go +++ b/cmd/query/app/querysvc/adjuster/sort.go @@ -12,14 +12,14 @@ import ( var _ Adjuster = (*SortAttributesAndEventsAdjuster)(nil) -// SortAttributesAndEvents creates an adjuster that standardizes trace data by sorting elements: +// SortCollections creates an adjuster that standardizes trace data by sorting elements: // - Resource attributes are sorted lexicographically by their keys. // - Scope attributes are sorted lexicographically by their keys. // - Span attributes are sorted lexicographically by their keys. // - Span events are sorted lexicographically by their names. // - Attributes within each span event are sorted lexicographically by their keys. // - Attributes within each span link are sorted lexicographically by their keys. -func SortAttributesAndEvents() SortAttributesAndEventsAdjuster { +func SortCollections() SortAttributesAndEventsAdjuster { return SortAttributesAndEventsAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/sort_test.go b/cmd/query/app/querysvc/adjuster/sort_test.go index d9bef8baa379..ba9d849cff85 100644 --- a/cmd/query/app/querysvc/adjuster/sort_test.go +++ b/cmd/query/app/querysvc/adjuster/sort_test.go @@ -11,7 +11,7 @@ import ( ) func TestSortAttributesAndEventsAdjuster(t *testing.T) { - adjuster := SortAttributesAndEvents() + adjuster := SortCollections() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go index b21ba7eeb6db..687fea06162c 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go @@ -15,7 +15,7 @@ import ( var errTooManySpans = errors.New("cannot assign unique span ID, too many spans in the trace") -// SpanIDUniquifier returns an adjuster that changes span ids for server +// DeduplicateClientServerSpanIDs returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -23,7 +23,7 @@ var errTooManySpans = errors.New("cannot assign unique span ID, too many spans i // // Any issues encountered during adjustment are recorded as warnings in the // span. -func SpanIDUniquifier() Adjuster { +func DeduplicateClientServerSpanIDs() Adjuster { return Func(func(traces ptrace.Traces) { adjuster := spanIDDeduper{ spansByID: make(map[pcommon.SpanID][]ptrace.Span), @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) { if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) { newID, err := d.makeUniqueSpanID() if err != nil { - jptrace.AddWarning(span, err.Error()) + jptrace.AddWarnings(span, err.Error()) continue } oldToNewSpanIDs[span.SpanID()] = newID diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go index 5ad6868f4434..282e0dcd50e7 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go @@ -46,7 +46,7 @@ func makeTraces() ptrace.Traces { func TestSpanIDUniquifierTriggered(t *testing.T) { traces := makeTraces() - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -73,7 +73,7 @@ func TestSpanIDUniquifierNotTriggered(t *testing.T) { spans.At(2).CopyTo(newSpans.AppendEmpty()) newSpans.CopyTo(spans) - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() diff --git a/cmd/query/app/querysvc/adjuster/spanlinks.go b/cmd/query/app/querysvc/adjuster/spanlinks.go index 60730fa5ecfe..3fe3cb7ec3a9 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks.go @@ -15,8 +15,8 @@ const ( var _ Adjuster = (*LinksAdjuster)(nil) -// SpanLinks creates an adjuster that removes span links with empty trace IDs. -func SpanLinks() LinksAdjuster { +// RemoveEmptySpanLinks creates an adjuster that removes span links with empty trace IDs. +func RemoveEmptySpanLinks() LinksAdjuster { return LinksAdjuster{} } @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) { newLink := validLinks.AppendEmpty() link.CopyTo(newLink) } else { - jptrace.AddWarning(span, invalidSpanLinkWarning) + jptrace.AddWarnings(span, invalidSpanLinkWarning) } } validLinks.CopyTo(span.Links()) diff --git a/cmd/query/app/querysvc/adjuster/spanlinks_test.go b/cmd/query/app/querysvc/adjuster/spanlinks_test.go index 33a8a5a78a67..ed2f69439cc0 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks_test.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks_test.go @@ -31,7 +31,7 @@ func TestLinksAdjuster(t *testing.T) { spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0})) spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})) - SpanLinks().Adjust(traces) + RemoveEmptySpanLinks().Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() gotSpansA := spans.At(0) diff --git a/cmd/query/app/querysvc/adjuster/standard.go b/cmd/query/app/querysvc/adjuster/standard.go new file mode 100644 index 000000000000..cfbb8915192a --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard.go @@ -0,0 +1,23 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "time" +) + +// StandardAdjusters returns a list of adjusters applied by the query service +// before returning the data to the API clients. +func StandardAdjusters(maxClockSkewAdjust time.Duration) []Adjuster { + return []Adjuster{ + DeduplicateClientServerSpanIDs(), + SortCollections(), + // DeduplicateSpans depends on SortCollections running first + DeduplicateSpans(), + CorrectClockSkew(maxClockSkewAdjust), + NormalizeIPAttributes(), + MoveLibraryAttributes(), + RemoveEmptySpanLinks(), + } +} diff --git a/cmd/query/app/querysvc/adjuster/standard_test.go b/cmd/query/app/querysvc/adjuster/standard_test.go new file mode 100644 index 000000000000..d2fadfe2ec05 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStandardAdjusters(t *testing.T) { + maxClockSkewAdjust := 10 * time.Second + adjusters := StandardAdjusters(maxClockSkewAdjust) + + assert.Len(t, adjusters, 7, "Expected 7 adjusters") + assert.IsType(t, DeduplicateClientServerSpanIDs(), adjusters[0]) + assert.IsType(t, SortCollections(), adjusters[1]) + assert.IsType(t, DeduplicateSpans(), adjusters[2]) + assert.IsType(t, CorrectClockSkew(maxClockSkewAdjust), adjusters[3]) + assert.IsType(t, NormalizeIPAttributes(), adjusters[4]) + assert.IsType(t, MoveLibraryAttributes(), adjusters[5]) + assert.IsType(t, RemoveEmptySpanLinks(), adjusters[6]) +} diff --git a/docker-compose/kafka/docker-compose.yml b/docker-compose/kafka/docker-compose.yml index 58e3061fa8ff..dad341b3a9f6 100644 --- a/docker-compose/kafka/docker-compose.yml +++ b/docker-compose/kafka/docker-compose.yml @@ -108,4 +108,4 @@ services: jaeger-remote-storage: condition: service_healthy links: - - jaeger-remote-storage + - jaeger-remote-storage \ No newline at end of file diff --git a/docker-compose/kafka/v2/docker-compose.yml b/docker-compose/kafka/v2/docker-compose.yml new file mode 100644 index 000000000000..5eb0b1d287ce --- /dev/null +++ b/docker-compose/kafka/v2/docker-compose.yml @@ -0,0 +1,21 @@ +services: + zookeeper: + image: bitnami/zookeeper:3.9.3 + ports: + - "2181:2181" + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + image: bitnami/kafka:2.8.0 + ports: + - "9092:9092" + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 + - KAFKA_LISTENERS=PLAINTEXT://:9092 + - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 + - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper \ No newline at end of file diff --git a/docker-compose/kafka-integration-test/docker-compose.yml b/docker-compose/kafka/v3/docker-compose.yml similarity index 100% rename from docker-compose/kafka-integration-test/docker-compose.yml rename to docker-compose/kafka/v3/docker-compose.yml diff --git a/go.mod b/go.mod index 762d0f365f49..a7eafab345e0 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,7 @@ require ( golang.org/x/net v0.33.0 golang.org/x/sys v0.28.0 google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.0 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 5d8e67a5e7a1..d694e4003f18 100644 --- a/go.sum +++ b/go.sum @@ -964,8 +964,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/jptrace/aggregator.go b/internal/jptrace/aggregator.go new file mode 100644 index 000000000000..f8c4b5c12d8a --- /dev/null +++ b/internal/jptrace/aggregator.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/pkg/iter" +) + +// AggregateTraces aggregates a sequence of trace batches into individual traces. +// +// The `tracesSeq` input must adhere to the chunking requirements of tracestore.Reader.GetTraces. +func AggregateTraces(tracesSeq iter.Seq2[[]ptrace.Traces, error]) iter.Seq2[ptrace.Traces, error] { + return func(yield func(trace ptrace.Traces, err error) bool) { + currentTrace := ptrace.NewTraces() + currentTraceID := pcommon.NewTraceIDEmpty() + + tracesSeq(func(traces []ptrace.Traces, err error) bool { + if err != nil { + yield(ptrace.NewTraces(), err) + return false + } + for _, trace := range traces { + resources := trace.ResourceSpans() + traceID := resources.At(0).ScopeSpans().At(0).Spans().At(0).TraceID() + if currentTraceID == traceID { + mergeTraces(trace, currentTrace) + } else { + if currentTrace.ResourceSpans().Len() > 0 { + if !yield(currentTrace, nil) { + return false + } + } + currentTrace = trace + currentTraceID = traceID + } + } + return true + }) + if currentTrace.ResourceSpans().Len() > 0 { + yield(currentTrace, nil) + } + } +} + +func mergeTraces(src, dest ptrace.Traces) { + resources := src.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + resource.CopyTo(dest.ResourceSpans().AppendEmpty()) + } +} diff --git a/internal/jptrace/aggregator_test.go b/internal/jptrace/aggregator_test.go new file mode 100644 index 000000000000..abeeaaa08455 --- /dev/null +++ b/internal/jptrace/aggregator_test.go @@ -0,0 +1,136 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestAggregateTraces_AggregatesSpansWithSameTraceID(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + trace1Continued := ptrace.NewTraces() + resource2 := trace1Continued.ResourceSpans().AppendEmpty() + scope2 := resource2.ScopeSpans().AppendEmpty() + span2 := scope2.Spans().AppendEmpty() + span2.SetTraceID(pcommon.TraceID([16]byte{1})) + span2.SetName("span2") + + trace2 := ptrace.NewTraces() + resource3 := trace2.ResourceSpans().AppendEmpty() + scope3 := resource3.ScopeSpans().AppendEmpty() + span3 := scope3.Spans().AppendEmpty() + span3.SetTraceID(pcommon.TraceID([16]byte{2})) + span3.SetName("span3") + + trace3 := ptrace.NewTraces() + resource4 := trace3.ResourceSpans().AppendEmpty() + scope4 := resource4.ScopeSpans().AppendEmpty() + span4 := scope4.Spans().AppendEmpty() + span4.SetTraceID(pcommon.TraceID([16]byte{3})) + span4.SetName("span4") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace1, trace1Continued, trace2}, nil) + yield([]ptrace.Traces{trace3}, nil) + } + + var result []ptrace.Traces + AggregateTraces(tracesSeq)(func(trace ptrace.Traces, _ error) bool { + result = append(result, trace) + return true + }) + + require.Len(t, result, 3) + + require.Equal(t, 2, result[0].ResourceSpans().Len()) + require.Equal(t, 1, result[1].ResourceSpans().Len()) + require.Equal(t, 1, result[2].ResourceSpans().Len()) + + gotSpan1 := result[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan1.TraceID(), pcommon.TraceID([16]byte{1})) + require.Equal(t, "span1", gotSpan1.Name()) + + gotSpan2 := result[0].ResourceSpans().At(1).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan2.TraceID(), pcommon.TraceID([16]byte{1})) + require.Equal(t, "span2", gotSpan2.Name()) + + gotSpan3 := result[1].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan3.TraceID(), pcommon.TraceID([16]byte{2})) + require.Equal(t, "span3", gotSpan3.Name()) + + gotSpan4 := result[2].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + require.Equal(t, gotSpan4.TraceID(), pcommon.TraceID([16]byte{3})) + require.Equal(t, "span4", gotSpan4.Name()) +} + +func TestAggregateTraces_YieldsErrorFromTracesSeq(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + if !yield(nil, assert.AnError) { + return + } + yield([]ptrace.Traces{trace1}, nil) // should not get here + } + aggregatedSeq := AggregateTraces(tracesSeq) + + var lastResult ptrace.Traces + var lastErr error + aggregatedSeq(func(trace ptrace.Traces, e error) bool { + lastResult = trace + if e != nil { + lastErr = e + } + return true + }) + + require.ErrorIs(t, lastErr, assert.AnError) + require.Equal(t, ptrace.NewTraces(), lastResult) +} + +func TestAggregateTraces_RespectsEarlyReturn(t *testing.T) { + trace1 := ptrace.NewTraces() + resource1 := trace1.ResourceSpans().AppendEmpty() + scope1 := resource1.ScopeSpans().AppendEmpty() + span1 := scope1.Spans().AppendEmpty() + span1.SetTraceID(pcommon.TraceID([16]byte{1})) + span1.SetName("span1") + + trace2 := ptrace.NewTraces() + resource2 := trace2.ResourceSpans().AppendEmpty() + scope2 := resource2.ScopeSpans().AppendEmpty() + span2 := scope2.Spans().AppendEmpty() + span2.SetTraceID(pcommon.TraceID([16]byte{2})) + span2.SetName("span2") + + tracesSeq := func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace1}, nil) + yield([]ptrace.Traces{trace2}, nil) + } + aggregatedSeq := AggregateTraces(tracesSeq) + + var lastResult ptrace.Traces + aggregatedSeq(func(trace ptrace.Traces, _ error) bool { + lastResult = trace + return false + }) + + require.Equal(t, trace1, lastResult) +} diff --git a/internal/jptrace/spanmap.go b/internal/jptrace/spanmap.go new file mode 100644 index 000000000000..2c5e28f72712 --- /dev/null +++ b/internal/jptrace/spanmap.go @@ -0,0 +1,24 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import "go.opentelemetry.io/collector/pdata/ptrace" + +// SpanMap iterates over all spans in the provided ptrace.Traces and maps each span +// to a key generated by the provided keyFn function. The resulting map has keys of type K +// and values of type ptrace.Span. +func SpanMap[K comparable](traces ptrace.Traces, keyFn func(ptrace.Span) K) map[K]ptrace.Span { + spanMap := make(map[K]ptrace.Span) + for i := 0; i < traces.ResourceSpans().Len(); i++ { + resource := traces.ResourceSpans().At(i) + for j := 0; j < resource.ScopeSpans().Len(); j++ { + scope := resource.ScopeSpans().At(j) + for k := 0; k < scope.Spans().Len(); k++ { + span := scope.Spans().At(k) + spanMap[keyFn(span)] = span + } + } + } + return spanMap +} diff --git a/internal/jptrace/spanmap_test.go b/internal/jptrace/spanmap_test.go new file mode 100644 index 000000000000..8c6bf4aee706 --- /dev/null +++ b/internal/jptrace/spanmap_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestSpanMap(t *testing.T) { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + span1 := ss.Spans().AppendEmpty() + span1.SetName("span1") + span2 := ss.Spans().AppendEmpty() + span2.SetName("span2") + + keyFn := func(span ptrace.Span) string { + return span.Name() + } + + spanMap := SpanMap(traces, keyFn) + + expectedMap := map[string]ptrace.Span{ + "span1": span1, + "span2": span2, + } + assert.Equal(t, expectedMap, spanMap) +} diff --git a/internal/jptrace/warning.go b/internal/jptrace/warning.go index 0d96296c2147..e8c7c69311f9 100644 --- a/internal/jptrace/warning.go +++ b/internal/jptrace/warning.go @@ -13,21 +13,23 @@ const ( // store various warnings produced from transformations, // such as inbound sanitizers and outbound adjusters. // The value type of the attribute is a string slice. - warningsAttribute = "jaeger.internal.warnings" + WarningsAttribute = "jaeger.internal.warnings" ) -func AddWarning(span ptrace.Span, warning string) { - var warnings pcommon.Slice - if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok { - warnings = currWarnings.Slice() +func AddWarnings(span ptrace.Span, warnings ...string) { + var w pcommon.Slice + if currWarnings, ok := span.Attributes().Get(WarningsAttribute); ok { + w = currWarnings.Slice() } else { - warnings = span.Attributes().PutEmptySlice(warningsAttribute) + w = span.Attributes().PutEmptySlice(WarningsAttribute) + } + for _, warning := range warnings { + w.AppendEmpty().SetStr(warning) } - warnings.AppendEmpty().SetStr(warning) } func GetWarnings(span ptrace.Span) []string { - if w, ok := span.Attributes().Get(warningsAttribute); ok { + if w, ok := span.Attributes().Get(WarningsAttribute); ok { warnings := []string{} ws := w.Slice() for i := 0; i < ws.Len(); i++ { diff --git a/internal/jptrace/warning_test.go b/internal/jptrace/warning_test.go index b8e3c38032c0..9bc62437dd46 100644 --- a/internal/jptrace/warning_test.go +++ b/internal/jptrace/warning_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) { warnings.AppendEmpty().SetStr(warn) } } - AddWarning(span, test.newWarn) + AddWarnings(span, test.newWarn) warnings, ok := attrs.Get("jaeger.internal.warnings") assert.True(t, ok) assert.Equal(t, len(test.expected), warnings.Slice().Len()) @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) { } } +func TestAddWarning_MultipleWarnings(t *testing.T) { + span := ptrace.NewSpan() + AddWarnings(span, "warning-1", "warning-2") + warnings, ok := span.Attributes().Get("jaeger.internal.warnings") + require.True(t, ok) + require.Equal(t, "warning-1", warnings.Slice().At(0).Str()) + require.Equal(t, "warning-2", warnings.Slice().At(1).Str()) +} + func TestGetWarnings(t *testing.T) { tests := []struct { name string diff --git a/internal/tools/go.mod b/internal/tools/go.mod index 10129a59f7eb..8837c27e97b7 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -5,7 +5,7 @@ go 1.23.0 require ( github.com/golangci/golangci-lint v1.62.2 github.com/josephspurrier/goversioninfo v1.4.1 - github.com/vektra/mockery/v2 v2.50.0 + github.com/vektra/mockery/v2 v2.50.1 mvdan.cc/gofumpt v0.7.0 ) diff --git a/internal/tools/go.sum b/internal/tools/go.sum index 0ee69b73eca9..edd177546107 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -567,8 +567,8 @@ github.com/uudashr/gocognit v1.1.3 h1:l+a111VcDbKfynh+airAy/DJQKaXh2m9vkoysMPSZy github.com/uudashr/gocognit v1.1.3/go.mod h1:aKH8/e8xbTRBwjbCkwZ8qt4l2EpKXl31KMHgSS+lZ2U= github.com/uudashr/iface v1.2.1 h1:vHHyzAUmWZ64Olq6NZT3vg/z1Ws56kyPdBOd5kTXDF8= github.com/uudashr/iface v1.2.1/go.mod h1:4QvspiRd3JLPAEXBQ9AiZpLbJlrWWgRChOKDJEuQTdg= -github.com/vektra/mockery/v2 v2.50.0 h1:0GYRH38nKiRghwUq+0aJXG1sT3yyTYj/J1xQRM8kGzQ= -github.com/vektra/mockery/v2 v2.50.0/go.mod h1:xO2DeYemEPC2tCzIZ+a1tifZ/7Laf/Chxg3vlc+oDsI= +github.com/vektra/mockery/v2 v2.50.1 h1:EmIvCwAkQOpkmWiGDYllNqWSuVQRuQ/yGVyRICWfMQ0= +github.com/vektra/mockery/v2 v2.50.1/go.mod h1:xO2DeYemEPC2tCzIZ+a1tifZ/7Laf/Chxg3vlc+oDsI= github.com/xen0n/gosmopolitan v1.2.2 h1:/p2KTnMzwRexIW8GlKawsTWOxn7UHA+jCMF/V8HHtvU= github.com/xen0n/gosmopolitan v1.2.2/go.mod h1:7XX7Mj61uLYrj0qmeN0zi7XDon9JRAEhYQqAPLVNTeg= github.com/yagipy/maintidx v1.0.0 h1:h5NvIsCz+nRDapQ0exNv4aJ0yXSI0420omVANTv3GJM= diff --git a/model/ids.go b/model/ids.go index 5bf93dc43bee..4c7b77c6aa94 100644 --- a/model/ids.go +++ b/model/ids.go @@ -261,3 +261,19 @@ func (s *SpanID) UnmarshalJSON(data []byte) error { func (s *SpanID) UnmarshalJSONPB(_ *jsonpb.Unmarshaler, b []byte) error { return s.UnmarshalJSON(b) } + +// ToOTELSpanID converts the SpanID to OTEL's representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func (s SpanID) ToOTELSpanID() pcommon.SpanID { + spanID := [8]byte{} + binary.BigEndian.PutUint64(spanID[:], uint64(s)) + return pcommon.SpanID(spanID) +} + +// ToOTELSpanID converts OTEL's SpanID to the model representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func SpanIDFromOTEL(spanID pcommon.SpanID) SpanID { + return SpanID(binary.BigEndian.Uint64(spanID[:])) +} diff --git a/model/ids_test.go b/model/ids_test.go index 40c6f801a5c6..ef2a9a65bebf 100644 --- a/model/ids_test.go +++ b/model/ids_test.go @@ -137,3 +137,55 @@ func TestTraceIDFromOTEL(t *testing.T) { } require.Equal(t, expected, model.TraceIDFromOTEL(otelTraceID)) } + +func TestToOTELSpanID(t *testing.T) { + tests := []struct { + name string + spanID model.SpanID + expected pcommon.SpanID + }{ + { + name: "zero span ID", + spanID: model.NewSpanID(0), + expected: pcommon.NewSpanIDEmpty(), + }, + { + name: "non-zero span ID", + spanID: model.NewSpanID(1), + expected: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := test.spanID.ToOTELSpanID() + assert.Equal(t, test.expected, actual) + }) + } +} + +func TestSpanIDFromOTEL(t *testing.T) { + tests := []struct { + name string + otelSpanID pcommon.SpanID + expected model.SpanID + }{ + { + name: "zero span ID", + otelSpanID: pcommon.NewSpanIDEmpty(), + expected: model.NewSpanID(0), + }, + { + name: "non-zero span ID", + otelSpanID: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + expected: model.NewSpanID(1), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := model.SpanIDFromOTEL(test.otelSpanID) + assert.Equal(t, test.expected, actual) + }) + } +} diff --git a/pkg/otelsemconv/semconv.go b/pkg/otelsemconv/semconv.go index 9acb3b88e2c6..5427dbfcc26f 100644 --- a/pkg/otelsemconv/semconv.go +++ b/pkg/otelsemconv/semconv.go @@ -24,6 +24,10 @@ const ( DBSystemKey = semconv.DBSystemKey PeerServiceKey = semconv.PeerServiceKey HTTPResponseStatusCodeKey = semconv.HTTPResponseStatusCodeKey + + HostIDKey = semconv.HostIDKey + HostIPKey = semconv.HostIPKey + HostNameKey = semconv.HostNameKey ) var HTTPResponseStatusCode = semconv.HTTPResponseStatusCode diff --git a/renovate.json b/renovate.json index ee3da55b62c2..d7433a85f670 100644 --- a/renovate.json +++ b/renovate.json @@ -22,7 +22,8 @@ "docker-compose/elasticsearch/v6/docker-compose.yml", "docker-compose/elasticsearch/v7/docker-compose.yml", "docker-compose/elasticsearch/v8/docker-compose.yml", - "docker-compose/kafka-integration-test/docker-compose.yml", + "docker-compose/kafka/v2/docker-compose.yml", + "docker-compose/kafka/v3/docker-compose.yml", "docker-compose/kafka/docker-compose.yml", "docker-compose/monitor/docker-compose.yml", "docker-compose/opensearch/v1/docker-compose.yml", diff --git a/scripts/kafka-integration-test.sh b/scripts/kafka-integration-test.sh index 0777e7efb1d0..c62f73a44f55 100755 --- a/scripts/kafka-integration-test.sh +++ b/scripts/kafka-integration-test.sh @@ -5,38 +5,63 @@ set -euf -o pipefail -compose_file="docker-compose/kafka-integration-test/docker-compose.yml" +compose_file="" jaeger_version="" +kafka_version="" manage_kafka="true" success="false" -print_help() { - echo "Usage: $0 [-K] -j " +usage() { + echo "Usage: $0 [-K] -j -v " echo " -K: do not start or stop Kafka container (useful for local testing)" echo " -j: major version of Jaeger to test (v1|v2)" + echo " -v: kafka major version (3.x|2.x)" exit 1 } +check_arg() { + if [ ! $# -eq 3 ]; then + echo "ERROR: need exactly three arguments" + usage + fi +} + parse_args() { - while getopts "j:Kh" opt; do + while getopts "j:v:Kh" opt; do case "${opt}" in j) jaeger_version=${OPTARG} ;; + v) + case ${OPTARG} in + 3.x) + kafka_version="v3" + ;; + 2.x) + kafka_version="v2" + ;; + *) + echo "Error: Invalid Kafka version. Valid options are 3.x or 2.x" + usage + ;; + esac + ;; K) manage_kafka="false" ;; *) - print_help + usage ;; esac done - if [ "$jaeger_version" != "v1" ] && [ "$jaeger_version" != "v2" ]; then + if [[ "$jaeger_version" != "v1" && "$jaeger_version" != "v2" ]]; then echo "Error: Invalid Jaeger version. Valid options are v1 or v2" - print_help + usage fi + compose_file="docker-compose/kafka/${kafka_version}/docker-compose.yml" } + setup_kafka() { echo "Starting Kafka using Docker Compose..." docker compose -f "${compose_file}" up -d kafka @@ -89,7 +114,7 @@ run_integration_test() { make jaeger-v2-storage-integration-test else echo "Unknown Jaeger version ${jaeger_version}." - print_help + usage fi } diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index 5426ff8e8d8a..5eef0ed8aca6 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -22,6 +22,7 @@ type Reader interface { // Chunking requirements: // - A single ptrace.Traces chunk MUST NOT contain spans from multiple traces. // - Large traces MAY be split across multiple, *consecutive* ptrace.Traces chunks. + // - Each returned ptrace.Traces object MUST NOT be empty. // // Edge cases: // - If no spans are found for any given trace ID, the ID is ignored. diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index a3d2293dd252..225e9267d89f 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -7,7 +7,6 @@ import ( "context" "errors" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -60,8 +59,8 @@ func (tr *TraceReader) GetTraces( return } batch := &model.Batch{Spans: t.GetSpans()} - tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) - if !yield([]ptrace.Traces{tr}, err) || err != nil { + tr := V1BatchesToTraces([]*model.Batch{batch}) + if !yield([]ptrace.Traces{tr}, nil) { return } } @@ -105,7 +104,7 @@ func (tr *TraceReader) FindTraces( } for _, trace := range traces { batch := &model.Batch{Spans: trace.GetSpans()} - otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + otelTrace := V1BatchesToTraces([]*model.Batch{batch}) if !yield([]ptrace.Traces{otelTrace}, nil) { return } diff --git a/storage_v2/v1adapter/translator.go b/storage_v2/v1adapter/translator.go new file mode 100644 index 000000000000..dda96a5630a1 --- /dev/null +++ b/storage_v2/v1adapter/translator.go @@ -0,0 +1,88 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/model" +) + +// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) +// to Jaeger model batches ([]*model.Batch). +func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { + batches := jaegerTranslator.ProtoFromTraces(traces) + spanMap := createSpanMapFromBatches(batches) + transferWarningsToModelSpans(traces, spanMap) + return batches +} + +// V1BatchesToTraces converts Jaeger model batches ([]*model.Batch) +// to OpenTelemetry traces (ptrace.Traces). +func V1BatchesToTraces(batches []*model.Batch) ptrace.Traces { + traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error + spanMap := jptrace.SpanMap(traces, func(s ptrace.Span) pcommon.SpanID { + return s.SpanID() + }) + transferWarningsToOTLPSpans(batches, spanMap) + return traces +} + +func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { + spanMap := make(map[model.SpanID]*model.Span) + for _, batch := range batches { + for _, span := range batch.Spans { + spanMap[span.SpanID] = span + } + } + return spanMap +} + +func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + otelSpan := spans.At(k) + warnings := jptrace.GetWarnings(otelSpan) + if len(warnings) == 0 { + continue + } + if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok { + span.Warnings = append(span.Warnings, warnings...) + // filter out the warning tag + span.Tags = filterTags(span.Tags, jptrace.WarningsAttribute) + } + } + } + } +} + +func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) { + for _, batch := range batches { + for _, span := range batch.Spans { + if len(span.Warnings) == 0 { + continue + } + if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok { + jptrace.AddWarnings(otelSpan, span.Warnings...) + } + } + } +} + +func filterTags(tags []model.KeyValue, keyToRemove string) []model.KeyValue { + var filteredTags []model.KeyValue + for _, tag := range tags { + if tag.Key != keyToRemove { + filteredTags = append(filteredTags, tag) + } + } + return filteredTags +} diff --git a/storage_v2/v1adapter/translator_test.go b/storage_v2/v1adapter/translator_test.go new file mode 100644 index 000000000000..e78a71f423fa --- /dev/null +++ b/storage_v2/v1adapter/translator_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package v1adapter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/model" +) + +func TestProtoFromTraces_AddsWarnings(t *testing.T) { + traces := ptrace.NewTraces() + rs1 := traces.ResourceSpans().AppendEmpty() + ss1 := rs1.ScopeSpans().AppendEmpty() + span1 := ss1.Spans().AppendEmpty() + span1.SetName("test-span-1") + span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + jptrace.AddWarnings(span1, "test-warning-1") + jptrace.AddWarnings(span1, "test-warning-2") + span1.Attributes().PutStr("key", "value") + + ss2 := rs1.ScopeSpans().AppendEmpty() + span2 := ss2.Spans().AppendEmpty() + span2.SetName("test-span-2") + span2.SetSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16})) + + rs2 := traces.ResourceSpans().AppendEmpty() + ss3 := rs2.ScopeSpans().AppendEmpty() + span3 := ss3.Spans().AppendEmpty() + span3.SetName("test-span-3") + span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24})) + jptrace.AddWarnings(span3, "test-warning-3") + + batches := ProtoFromTraces(traces) + + assert.Len(t, batches, 2) + + assert.Len(t, batches[0].Spans, 2) + assert.Equal(t, "test-span-1", batches[0].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, batches[0].Spans[0].Warnings) + assert.Equal(t, []model.KeyValue{{Key: "key", VStr: "value"}}, batches[0].Spans[0].Tags) + assert.Equal(t, "test-span-2", batches[0].Spans[1].OperationName) + assert.Empty(t, batches[0].Spans[1].Warnings) + assert.Empty(t, batches[0].Spans[1].Tags) + + assert.Len(t, batches[1].Spans, 1) + assert.Equal(t, "test-span-3", batches[1].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings) + assert.Empty(t, batches[1].Spans[0].Tags) +} + +func TestProtoToTraces_AddsWarnings(t *testing.T) { + batch1 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-1", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-1", + SpanID: model.NewSpanID(1), + Warnings: []string{"test-warning-1", "test-warning-2"}, + }, + { + OperationName: "test-span-2", + SpanID: model.NewSpanID(2), + }, + }, + } + batch2 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-2", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-3", + SpanID: model.NewSpanID(3), + Warnings: []string{"test-warning-3"}, + }, + }, + } + batches := []*model.Batch{batch1, batch2} + traces := V1BatchesToTraces(batches) + + assert.Equal(t, 2, traces.ResourceSpans().Len()) + + spanMap := jptrace.SpanMap(traces, func(s ptrace.Span) string { + return s.Name() + }) + + span1 := spanMap["test-span-1"] + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID()) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, jptrace.GetWarnings(span1)) + + span2 := spanMap["test-span-2"] + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID()) + assert.Empty(t, jptrace.GetWarnings(span2)) + + span3 := spanMap["test-span-3"] + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) + assert.Equal(t, []string{"test-warning-3"}, jptrace.GetWarnings(span3)) +} diff --git a/storage_v2/v1adapter/writer.go b/storage_v2/v1adapter/writer.go index c0a9d4ced990..5602d1ae8f7d 100644 --- a/storage_v2/v1adapter/writer.go +++ b/storage_v2/v1adapter/writer.go @@ -7,7 +7,6 @@ import ( "context" "errors" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -26,7 +25,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) tracestore.Writer { // WriteTraces implements tracestore.Writer. func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := ProtoFromTraces(td) var errs []error for _, batch := range batches { for _, span := range batch.Spans {