From ad0de07020d59b15260566e3cadaf415c48a18a9 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 11 Dec 2024 18:53:02 -0500 Subject: [PATCH 01/35] Create Query Service V2 To Operate on OTEL Data Model Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 141 +++++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 cmd/query/app/querysvc/query_service_v2.go diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go new file mode 100644 index 00000000000..51d727502ed --- /dev/null +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -0,0 +1,141 @@ +// // Copyright (c) 2019 The Jaeger Authors. +// // SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/model/adjuster" + "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +// TODO: Remove query_service.go and rename query_service_v2.go +// to query_service.go once all components have been migrated to +// operate on the OTEL data model. +var errNoArchiveSpanStorageV2 = errors.New("archive span storage was not configured") + +const ( + defaultMaxClockSkewAdjustV2 = time.Second +) + +// QueryServiceOptions has optional members of QueryService +type QueryServiceOptionsV2 struct { + ArchiveTraceReader tracestore.Reader + ArchiveTraceWriter tracestore.Writer + Adjuster adjuster.Adjuster +} + +// StorageCapabilities is a feature flag for query service +type StorageCapabilitiesV2 struct { + ArchiveStorage bool `json:"archiveStorage"` + // TODO: Maybe add metrics Storage here + // SupportRegex bool + // SupportTagFilter bool +} + +// QueryService contains span utils required by the query-service. +type QueryServiceV2 struct { + traceReader tracestore.Reader + dependencyReader depstore.Reader + options QueryServiceOptionsV2 +} + +// NewQueryService returns a new QueryService. +func NewQueryServiceV2(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService { + qsvc := &QueryService{ + traceReader: traceReader, + dependencyReader: dependencyReader, + options: options, + } + + if qsvc.options.Adjuster == nil { + qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjustV2)...) + } + return qsvc +} + +// GetTrace is the queryService implementation of tracestore.Reader.GetTrace +func (qs QueryServiceV2) GetTrace(ctx context.Context, traceID pcommon.TraceID) (ptrace.Traces, error) { + trace, err := qs.traceReader.GetTrace(ctx, traceID) + if errors.Is(err, spanstore.ErrTraceNotFound) { + if qs.options.ArchiveTraceReader == nil { + return ptrace.NewTraces(), err + } + trace, err = qs.options.ArchiveTraceReader.GetTrace(ctx, traceID) + } + return trace, err +} + +// GetServices is the queryService implementation of tracestore.Reader.GetServices +func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) { + return qs.traceReader.GetServices(ctx) +} + +// GetOperations is the queryService implementation of tracestore.Reader.GetOperations +func (qs QueryServiceV2) GetOperations( + ctx context.Context, + query tracestore.OperationQueryParameters, +) ([]tracestore.Operation, error) { + return qs.traceReader.GetOperations(ctx, query) +} + +// FindTraces is the queryService implementation of tracestore.Reader.FindTraces +func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParameters) ([]ptrace.Traces, error) { + return qs.traceReader.FindTraces(ctx, query) +} + +// ArchiveTrace is the queryService utility to archive traces. +func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error { + if qs.options.ArchiveTraceWriter == nil { + return errNoArchiveSpanStorageV2 + } + trace, err := qs.GetTrace(ctx, traceID) + if err != nil { + return err + } + return qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) +} + +// Adjust applies adjusters to the trace. +func (qs QueryServiceV2) Adjust(trace *model.Trace) (*model.Trace, error) { + return qs.options.Adjuster.Adjust(trace) +} + +// GetDependencies implements depstore.Reader.GetDependencies +func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ + StartTime: endTs.Add(-lookback), + EndTime: endTs, + }) +} + +// GetCapabilities returns the features supported by the query service. +func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { + return StorageCapabilities{ + ArchiveStorage: qs.options.hasArchiveStorage(), + } +} + +// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. +func (opts *QueryServiceOptionsV2) InitArchiveStorage( + archiveReader tracestore.Reader, + archiveWriter tracestore.Writer, + logger *zap.Logger) { + opts.ArchiveTraceReader = archiveReader + opts.ArchiveTraceWriter = archiveWriter +} + +// hasArchiveStorage returns true if archive storage reader/writer are initialized. +func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { + return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil +} From a33ab6c1eec33b8928883621d40851f279be1639 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 24 Dec 2024 09:56:40 -0500 Subject: [PATCH 02/35] Address Build Failures And Feedback Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service.go | 6 --- cmd/query/app/querysvc/query_service_v2.go | 63 +++++++++++----------- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 724a2536583..e074559a371 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -19,12 +19,6 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) -var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") - -const ( - defaultMaxClockSkewAdjust = time.Second -) - // QueryServiceOptions has optional members of QueryService type QueryServiceOptions struct { ArchiveSpanReader spanstore.Reader diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 51d727502ed..121c8bc0a04 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -1,5 +1,5 @@ -// // Copyright (c) 2019 The Jaeger Authors. -// // SPDX-License-Identifier: Apache-2.0 +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 package querysvc @@ -10,22 +10,19 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/model/adjuster" + "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) -// TODO: Remove query_service.go and rename query_service_v2.go -// to query_service.go once all components have been migrated to -// operate on the OTEL data model. -var errNoArchiveSpanStorageV2 = errors.New("archive span storage was not configured") +var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") const ( - defaultMaxClockSkewAdjustV2 = time.Second + defaultMaxClockSkewAdjust = time.Second ) // QueryServiceOptions has optional members of QueryService @@ -51,29 +48,36 @@ type QueryServiceV2 struct { } // NewQueryService returns a new QueryService. -func NewQueryServiceV2(traceReader tracestore.Reader, dependencyReader depstore.Reader, options QueryServiceOptions) *QueryService { - qsvc := &QueryService{ +func NewQueryServiceV2( + traceReader tracestore.Reader, + dependencyReader depstore.Reader, + options QueryServiceOptionsV2, +) *QueryServiceV2 { + qsvc := &QueryServiceV2{ traceReader: traceReader, dependencyReader: dependencyReader, options: options, } if qsvc.options.Adjuster == nil { - qsvc.options.Adjuster = adjuster.Sequence(StandardAdjusters(defaultMaxClockSkewAdjustV2)...) + qsvc.options.Adjuster = adjuster.Sequence(adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) } return qsvc } // GetTrace is the queryService implementation of tracestore.Reader.GetTrace -func (qs QueryServiceV2) GetTrace(ctx context.Context, traceID pcommon.TraceID) (ptrace.Traces, error) { - trace, err := qs.traceReader.GetTrace(ctx, traceID) +func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { + traceIter := qs.traceReader.GetTraces(ctx, traceIDs...) + _, err := iter.FlattenWithErrors(traceIter) if errors.Is(err, spanstore.ErrTraceNotFound) { if qs.options.ArchiveTraceReader == nil { - return ptrace.NewTraces(), err + return func(yield func([]ptrace.Traces, error) bool) { + yield(nil, err) + } } - trace, err = qs.options.ArchiveTraceReader.GetTrace(ctx, traceID) + traceIter = qs.options.ArchiveTraceReader.GetTraces(ctx, traceIDs...) } - return trace, err + return traceIter } // GetServices is the queryService implementation of tracestore.Reader.GetServices @@ -90,25 +94,29 @@ func (qs QueryServiceV2) GetOperations( } // FindTraces is the queryService implementation of tracestore.Reader.FindTraces -func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParameters) ([]ptrace.Traces, error) { +func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { return qs.traceReader.FindTraces(ctx, query) } // ArchiveTrace is the queryService utility to archive traces. func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error { if qs.options.ArchiveTraceWriter == nil { - return errNoArchiveSpanStorageV2 + return errNoArchiveSpanStorage } - trace, err := qs.GetTrace(ctx, traceID) + var err error + traces, err := iter.FlattenWithErrors(qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID})) if err != nil { return err } - return qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) + for _, trace := range traces { + err = errors.Join(err, qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)) + } + return err } // Adjust applies adjusters to the trace. -func (qs QueryServiceV2) Adjust(trace *model.Trace) (*model.Trace, error) { - return qs.options.Adjuster.Adjust(trace) +func (qs QueryServiceV2) Adjust(trace ptrace.Traces) { + qs.options.Adjuster.Adjust(trace) } // GetDependencies implements depstore.Reader.GetDependencies @@ -126,15 +134,6 @@ func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { } } -// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. -func (opts *QueryServiceOptionsV2) InitArchiveStorage( - archiveReader tracestore.Reader, - archiveWriter tracestore.Writer, - logger *zap.Logger) { - opts.ArchiveTraceReader = archiveReader - opts.ArchiveTraceWriter = archiveWriter -} - // hasArchiveStorage returns true if archive storage reader/writer are initialized. func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil From ffad62997ac66090fa8ba1d307a366a5ba33c78e Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 24 Dec 2024 11:36:15 -0500 Subject: [PATCH 03/35] Fix ArchiveTrace And GetTraces To Operate On Iterators Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 65 ++++++++++++++++------ 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 121c8bc0a04..47f00f0e0ef 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -14,7 +14,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" - "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -67,17 +66,40 @@ func NewQueryServiceV2( // GetTrace is the queryService implementation of tracestore.Reader.GetTrace func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { - traceIter := qs.traceReader.GetTraces(ctx, traceIDs...) - _, err := iter.FlattenWithErrors(traceIter) - if errors.Is(err, spanstore.ErrTraceNotFound) { - if qs.options.ArchiveTraceReader == nil { - return func(yield func([]ptrace.Traces, error) bool) { - yield(nil, err) + getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...) + return func(yield func([]ptrace.Traces, error) bool) { + foundTraceIDs := make(map[pcommon.TraceID]struct{}) + getTracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + return yield(nil, err) + } + for _, trace := range traces { + resources := trace.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + foundTraceIDs[span.TraceID()] = struct{}{} + } + } + } + } + return yield(traces, nil) + }) + if qs.options.ArchiveTraceReader != nil { + missingTraceIDs := []tracestore.GetTraceParams{} + for _, id := range traceIDs { + if _, found := foundTraceIDs[id.TraceID]; !found { + missingTraceIDs = append(missingTraceIDs, id) + } + } + if len(missingTraceIDs) > 0 { + qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)(yield) } } - traceIter = qs.options.ArchiveTraceReader.GetTraces(ctx, traceIDs...) } - return traceIter } // GetServices is the queryService implementation of tracestore.Reader.GetServices @@ -103,15 +125,22 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.Trace if qs.options.ArchiveTraceWriter == nil { return errNoArchiveSpanStorage } - var err error - traces, err := iter.FlattenWithErrors(qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID})) - if err != nil { - return err - } - for _, trace := range traces { - err = errors.Join(err, qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace)) - } - return err + getTracesIter := qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID}) + var archiveErr error + getTracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + archiveErr = err + return false + } + for _, trace := range traces { + err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) + if err != nil { + return false + } + } + return true + }) + return archiveErr } // Adjust applies adjusters to the trace. From 1d721ac7c239d5e253b883ffebf44eed0f416a75 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 24 Dec 2024 11:41:35 -0500 Subject: [PATCH 04/35] Change Adjuster To Work On Seq Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 47f00f0e0ef..4400fd87bab 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -144,8 +144,13 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.Trace } // Adjust applies adjusters to the trace. -func (qs QueryServiceV2) Adjust(trace ptrace.Traces) { - qs.options.Adjuster.Adjust(trace) +func (qs QueryServiceV2) Adjust(tracesIter iter.Seq[[]ptrace.Traces]) { + tracesIter(func(traces []ptrace.Traces) bool { + for _, trace := range traces { + qs.options.Adjuster.Adjust(trace) + } + return true + }) } // GetDependencies implements depstore.Reader.GetDependencies From a6df503f13fdc54908d9b3e0838eaa37380beec4 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 24 Dec 2024 11:43:44 -0500 Subject: [PATCH 05/35] Fix Error Capture Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 4400fd87bab..d012086096f 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -135,6 +135,7 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.Trace for _, trace := range traces { err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) if err != nil { + archiveErr = err return false } } From 60a875ad51f18d4ccd9f4aa70a19d213376aea5d Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Wed, 25 Dec 2024 00:25:05 -0500 Subject: [PATCH 06/35] Address Feedback From PR Review Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service.go | 8 -------- cmd/query/app/querysvc/query_service_v2.go | 15 ++++++++++----- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index e074559a371..2c1c192f290 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -26,14 +26,6 @@ type QueryServiceOptions struct { Adjuster adjuster.Adjuster } -// StorageCapabilities is a feature flag for query service -type StorageCapabilities struct { - ArchiveStorage bool `json:"archiveStorage"` - // TODO: Maybe add metrics Storage here - // SupportRegex bool - // SupportTagFilter bool -} - // QueryService contains span utils required by the query-service. type QueryService struct { traceReader tracestore.Reader diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index d012086096f..d8c165937d7 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -32,7 +32,7 @@ type QueryServiceOptionsV2 struct { } // StorageCapabilities is a feature flag for query service -type StorageCapabilitiesV2 struct { +type StorageCapabilities struct { ArchiveStorage bool `json:"archiveStorage"` // TODO: Maybe add metrics Storage here // SupportRegex bool @@ -59,13 +59,18 @@ func NewQueryServiceV2( } if qsvc.options.Adjuster == nil { - qsvc.options.Adjuster = adjuster.Sequence(adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) + qsvc.options.Adjuster = adjuster.Sequence( + adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) } return qsvc } -// GetTrace is the queryService implementation of tracestore.Reader.GetTrace -func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { +// GetTraces retrieves traces with given trace IDs from the primary reader, +// and if any of them are not found it then queries the archive reader. +// The iterator is single-use: once consumed, it cannot be used again. +func (qs QueryServiceV2) GetTraces( + ctx context.Context, + traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...) return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) @@ -89,7 +94,7 @@ func (qs QueryServiceV2) GetTraces(ctx context.Context, traceIDs ...tracestore.G return yield(traces, nil) }) if qs.options.ArchiveTraceReader != nil { - missingTraceIDs := []tracestore.GetTraceParams{} + var missingTraceIDs []tracestore.GetTraceParams for _, id := range traceIDs { if _, found := foundTraceIDs[id.TraceID]; !found { missingTraceIDs = append(missingTraceIDs, id) From d32ba485199591cbf3d73f72b4a6dec4ced16da2 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 18:48:37 -0500 Subject: [PATCH 07/35] Run Linter Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index d8c165937d7..add0e4d2570 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -70,7 +70,8 @@ func NewQueryServiceV2( // The iterator is single-use: once consumed, it cannot be used again. func (qs QueryServiceV2) GetTraces( ctx context.Context, - traceIDs ...tracestore.GetTraceParams) iter.Seq2[[]ptrace.Traces, error] { + traceIDs ...tracestore.GetTraceParams, +) iter.Seq2[[]ptrace.Traces, error] { getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...) return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) From ecab080ceb87527d64fd28166a8b40958175da19 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 19:54:08 -0500 Subject: [PATCH 08/35] Update Query Service To Perform Adjustments Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 60 ++++++++++++++++------ 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index add0e4d2570..73f9a18ff8f 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -46,6 +46,23 @@ type QueryServiceV2 struct { options QueryServiceOptionsV2 } +// GetTraceParams defines the parameters for retrieving traces using the GetTraces function. +type GetTraceParams struct { + // TraceIDs is a slice of trace identifiers to fetch. + TraceIDs []tracestore.GetTraceParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptionsV2.Adjuster. + RawTraces bool +} + +// TraceQueryParams represents the parameters for querying a batch of traces. +type TraceQueryParams struct { + tracestore.TraceQueryParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptionsV2.Adjuster. + RawTraces bool +} + // NewQueryService returns a new QueryService. func NewQueryServiceV2( traceReader tracestore.Reader, @@ -70,9 +87,10 @@ func NewQueryServiceV2( // The iterator is single-use: once consumed, it cannot be used again. func (qs QueryServiceV2) GetTraces( ctx context.Context, - traceIDs ...tracestore.GetTraceParams, + params GetTraceParams, ) iter.Seq2[[]ptrace.Traces, error] { - getTracesIter := qs.traceReader.GetTraces(ctx, traceIDs...) + getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) + // TODO: aggregate the traces return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) getTracesIter(func(traces []ptrace.Traces, err error) bool { @@ -80,6 +98,9 @@ func (qs QueryServiceV2) GetTraces( return yield(nil, err) } for _, trace := range traces { + if !params.RawTraces { + qs.options.Adjuster.Adjust(trace) + } resources := trace.ResourceSpans() for i := 0; i < resources.Len(); i++ { scopes := resources.At(i).ScopeSpans() @@ -96,7 +117,7 @@ func (qs QueryServiceV2) GetTraces( }) if qs.options.ArchiveTraceReader != nil { var missingTraceIDs []tracestore.GetTraceParams - for _, id := range traceIDs { + for _, id := range params.TraceIDs { if _, found := foundTraceIDs[id.TraceID]; !found { missingTraceIDs = append(missingTraceIDs, id) } @@ -122,8 +143,21 @@ func (qs QueryServiceV2) GetOperations( } // FindTraces is the queryService implementation of tracestore.Reader.FindTraces -func (qs QueryServiceV2) FindTraces(ctx context.Context, query tracestore.TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { - return qs.traceReader.FindTraces(ctx, query) +func (qs QueryServiceV2) FindTraces(ctx context.Context, query TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { + return func(yield func([]ptrace.Traces, error) bool) { + tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) + tracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + return yield(nil, err) + } + if !query.RawTraces { + for _, trace := range traces { + qs.options.Adjuster.Adjust(trace) + } + } + return yield(traces, nil) + }) + } } // ArchiveTrace is the queryService utility to archive traces. @@ -131,7 +165,11 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.Trace if qs.options.ArchiveTraceWriter == nil { return errNoArchiveSpanStorage } - getTracesIter := qs.GetTraces(ctx, tracestore.GetTraceParams{TraceID: traceID}) + getTracesIter := qs.GetTraces( + ctx, GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{{TraceID: traceID}}, + }, + ) var archiveErr error getTracesIter(func(traces []ptrace.Traces, err error) bool { if err != nil { @@ -150,16 +188,6 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.Trace return archiveErr } -// Adjust applies adjusters to the trace. -func (qs QueryServiceV2) Adjust(tracesIter iter.Seq[[]ptrace.Traces]) { - tracesIter(func(traces []ptrace.Traces) bool { - for _, trace := range traces { - qs.options.Adjuster.Adjust(trace) - } - return true - }) -} - // GetDependencies implements depstore.Reader.GetDependencies func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ From 9ee2d38163a3539a248d5093f123e0712479dddc Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 20:36:43 -0500 Subject: [PATCH 09/35] Fix Signature of ArchiveTrace Function Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 73f9a18ff8f..0a7e1e47cd2 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -161,14 +161,12 @@ func (qs QueryServiceV2) FindTraces(ctx context.Context, query TraceQueryParams) } // ArchiveTrace is the queryService utility to archive traces. -func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, traceID pcommon.TraceID) error { +func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { if qs.options.ArchiveTraceWriter == nil { return errNoArchiveSpanStorage } getTracesIter := qs.GetTraces( - ctx, GetTraceParams{ - TraceIDs: []tracestore.GetTraceParams{{TraceID: traceID}}, - }, + ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, ) var archiveErr error getTracesIter(func(traces []ptrace.Traces, err error) bool { From 57ef9e7bd6dc2beabad4061973250786db5d154a Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 20:37:07 -0500 Subject: [PATCH 10/35] Add Some Unit Tests Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_test.go | 4 - .../app/querysvc/query_service_v2_test.go | 153 ++++++++++++++++++ 2 files changed, 153 insertions(+), 4 deletions(-) create mode 100644 cmd/query/app/querysvc/query_service_v2_test.go diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 64ef7500e3f..5d34fc54919 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -28,11 +28,7 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) -const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) - var ( - defaultDependencyLookbackDuration = time.Hour * 24 - mockTraceID = model.NewTraceID(0, 123456) mockTrace = &model.Trace{ Spans: []*model.Span{ diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go new file mode 100644 index 00000000000..063da025ccb --- /dev/null +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -0,0 +1,153 @@ +package querysvc + +import ( + "context" + "testing" + "time" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + depstoremocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) + +var ( + defaultDependencyLookbackDuration = time.Hour * 24 + + testTraceID = pcommon.TraceID([16]byte{1}) +) + +type testQueryServiceV2 struct { + queryService *QueryServiceV2 + traceReader *tracestoremocks.Reader + depsReader *depstoremocks.Reader + + archiveTraceReader *tracestoremocks.Reader + archiveTraceWriter *tracestoremocks.Writer +} + +type testOptionV2 func(*testQueryServiceV2, *QueryServiceOptionsV2) + +func withArchiveTraceReader() testOptionV2 { + return func(tqs *testQueryServiceV2, options *QueryServiceOptionsV2) { + r := &tracestoremocks.Reader{} + tqs.archiveTraceReader = r + options.ArchiveTraceReader = r + } +} + +func withArchiveTraceWriter() testOptionV2 { + return func(tqs *testQueryServiceV2, options *QueryServiceOptionsV2) { + r := &tracestoremocks.Writer{} + tqs.archiveTraceWriter = r + options.ArchiveTraceWriter = r + } +} + +func initializeTestServiceV2(optionAppliers ...testOptionV2) *testQueryServiceV2 { + traceReader := &tracestoremocks.Reader{} + dependencyStorage := &depstoremocks.Reader{} + + options := QueryServiceOptionsV2{} + + tqs := testQueryServiceV2{ + traceReader: traceReader, + depsReader: dependencyStorage, + } + + for _, optApplier := range optionAppliers { + optApplier(&tqs, &options) + } + + tqs.queryService = NewQueryServiceV2(traceReader, dependencyStorage, options) + return &tqs +} + +func TestGetServicesV2(t *testing.T) { + tqs := initializeTestServiceV2() + expectedServices := []string{"trifle", "bling"} + tqs.traceReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + + type contextKey string + ctx := context.Background() + actualServices, err := tqs.queryService.GetServices(context.WithValue(ctx, contextKey("foo"), "bar")) + require.NoError(t, err) + assert.Equal(t, expectedServices, actualServices) +} + +func TestGetOperationsV2(t *testing.T) { + tqs := initializeTestServiceV2() + expectedOperations := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} + operationQuery := tracestore.OperationQueryParameters{ServiceName: "abc/trifle"} + tqs.traceReader.On( + "GetOperations", + mock.AnythingOfType("*context.valueCtx"), + operationQuery, + ).Return(expectedOperations, nil).Once() + + type contextKey string + ctx := context.Background() + actualOperations, err := tqs.queryService.GetOperations(context.WithValue(ctx, contextKey("foo"), "bar"), operationQuery) + require.NoError(t, err) + assert.Equal(t, expectedOperations, actualOperations) +} + +func TestArchiveTraceV2_NoOptions(t *testing.T) { + tqs := initializeTestServiceV2() + + type contextKey string + ctx := context.Background() + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) + assert.Equal(t, errNoArchiveSpanStorage, err) +} + +func TestGetDependenciesV2(t *testing.T) { + tqs := initializeTestServiceV2() + expectedDependencies := []model.DependencyLink{ + { + Parent: "killer", + Child: "queen", + CallCount: 12, + }, + } + endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) + tqs.depsReader.On( + "GetDependencies", + mock.Anything, // context.Context + depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }).Return(expectedDependencies, nil).Times(1) + + actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration) + require.NoError(t, err) + assert.Equal(t, expectedDependencies, actualDependencies) +} + +func TestGetCapabilitiesV2(t *testing.T) { + tqs := initializeTestServiceV2() + expectedStorageCapabilities := StorageCapabilities{ + ArchiveStorage: false, + } + assert.Equal(t, expectedStorageCapabilities, tqs.queryService.GetCapabilities()) +} + +func TestGetCapabilitiesWithSupportsArchiveV2(t *testing.T) { + tqs := initializeTestServiceV2(withArchiveTraceReader(), withArchiveTraceWriter()) + + expectedStorageCapabilities := StorageCapabilities{ + ArchiveStorage: true, + } + assert.Equal(t, expectedStorageCapabilities, tqs.queryService.GetCapabilities()) +} From 40d9a68ed68ee468ed19e4cb957c9e9465a977de Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 20:41:24 -0500 Subject: [PATCH 11/35] Add Missing License Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 063da025ccb..869fd85b024 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -1,3 +1,6 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + package querysvc import ( @@ -5,15 +8,16 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage_v2/depstore" depstoremocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/tracestore" tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" ) const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) From 32491f5292bb48e3e8a09910b0136bfecd55bba3 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 20:42:34 -0500 Subject: [PATCH 12/35] Adjust Archive Trace Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 0a7e1e47cd2..0605e125be9 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -123,7 +123,18 @@ func (qs QueryServiceV2) GetTraces( } } if len(missingTraceIDs) > 0 { - qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...)(yield) + getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) + getArchiveTracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + return yield(nil, err) + } + for _, trace := range traces { + if !params.RawTraces { + qs.options.Adjuster.Adjust(trace) + } + } + return yield(traces, nil) + }) } } } From 3d6af5fe2211c77c3ac7274356d74ab42c9bf383 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 20:58:31 -0500 Subject: [PATCH 13/35] Aggregate Traces Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 41 +++++++++++----------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 0605e125be9..d4650934bde 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage_v2/depstore" @@ -90,30 +91,29 @@ func (qs QueryServiceV2) GetTraces( params GetTraceParams, ) iter.Seq2[[]ptrace.Traces, error] { getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) + aggregatedTraces := jptrace.AggregateTraces(getTracesIter) // TODO: aggregate the traces return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) - getTracesIter(func(traces []ptrace.Traces, err error) bool { + aggregatedTraces(func(trace ptrace.Traces, err error) bool { if err != nil { return yield(nil, err) } - for _, trace := range traces { - if !params.RawTraces { - qs.options.Adjuster.Adjust(trace) - } - resources := trace.ResourceSpans() - for i := 0; i < resources.Len(); i++ { - scopes := resources.At(i).ScopeSpans() - for j := 0; j < scopes.Len(); j++ { - spans := scopes.At(j).Spans() - for k := 0; k < spans.Len(); k++ { - span := spans.At(k) - foundTraceIDs[span.TraceID()] = struct{}{} - } + if !params.RawTraces { + qs.options.Adjuster.Adjust(trace) + } + resources := trace.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + foundTraceIDs[span.TraceID()] = struct{}{} } } } - return yield(traces, nil) + return yield([]ptrace.Traces{trace}, nil) }) if qs.options.ArchiveTraceReader != nil { var missingTraceIDs []tracestore.GetTraceParams @@ -124,16 +124,15 @@ func (qs QueryServiceV2) GetTraces( } if len(missingTraceIDs) > 0 { getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) - getArchiveTracesIter(func(traces []ptrace.Traces, err error) bool { + aggregatedArchiveTraces := jptrace.AggregateTraces(getArchiveTracesIter) + aggregatedArchiveTraces(func(trace ptrace.Traces, err error) bool { if err != nil { return yield(nil, err) } - for _, trace := range traces { - if !params.RawTraces { - qs.options.Adjuster.Adjust(trace) - } + if !params.RawTraces { + qs.options.Adjuster.Adjust(trace) } - return yield(traces, nil) + return yield([]ptrace.Traces{trace}, nil) }) } } From ab95ab7c5503e6e85675b8c2f94568822b944c73 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 21:05:12 -0500 Subject: [PATCH 14/35] Remove TODO Comment Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index d4650934bde..63456659304 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -92,7 +92,6 @@ func (qs QueryServiceV2) GetTraces( ) iter.Seq2[[]ptrace.Traces, error] { getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) aggregatedTraces := jptrace.AggregateTraces(getTracesIter) - // TODO: aggregate the traces return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) aggregatedTraces(func(trace ptrace.Traces, err error) bool { @@ -153,7 +152,10 @@ func (qs QueryServiceV2) GetOperations( } // FindTraces is the queryService implementation of tracestore.Reader.FindTraces -func (qs QueryServiceV2) FindTraces(ctx context.Context, query TraceQueryParams) iter.Seq2[[]ptrace.Traces, error] { +func (qs QueryServiceV2) FindTraces( + ctx context.Context, + query TraceQueryParams, +) iter.Seq2[[]ptrace.Traces, error] { return func(yield func([]ptrace.Traces, error) bool) { tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) tracesIter(func(traces []ptrace.Traces, err error) bool { From 005d8d7dc7d874e0b879de9ed027dbad73a73d45 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 22:04:04 -0500 Subject: [PATCH 15/35] Use SpanIter Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 63456659304..2b558e4161a 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -101,17 +101,10 @@ func (qs QueryServiceV2) GetTraces( if !params.RawTraces { qs.options.Adjuster.Adjust(trace) } - resources := trace.ResourceSpans() - for i := 0; i < resources.Len(); i++ { - scopes := resources.At(i).ScopeSpans() - for j := 0; j < scopes.Len(); j++ { - spans := scopes.At(j).Spans() - for k := 0; k < spans.Len(); k++ { - span := spans.At(k) - foundTraceIDs[span.TraceID()] = struct{}{} - } - } - } + jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { + foundTraceIDs[span.TraceID()] = struct{}{} + return true + }) return yield([]ptrace.Traces{trace}, nil) }) if qs.options.ArchiveTraceReader != nil { From 71717851858bbd2bf7543c1c96bfe621e8d2ead6 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 22:10:38 -0500 Subject: [PATCH 16/35] Add Proceed Check Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 2b558e4161a..9d2d1a0fd00 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -94,9 +94,11 @@ func (qs QueryServiceV2) GetTraces( aggregatedTraces := jptrace.AggregateTraces(getTracesIter) return func(yield func([]ptrace.Traces, error) bool) { foundTraceIDs := make(map[pcommon.TraceID]struct{}) + proceed := true aggregatedTraces(func(trace ptrace.Traces, err error) bool { if err != nil { - return yield(nil, err) + proceed = yield(nil, err) + return proceed } if !params.RawTraces { qs.options.Adjuster.Adjust(trace) @@ -105,9 +107,10 @@ func (qs QueryServiceV2) GetTraces( foundTraceIDs[span.TraceID()] = struct{}{} return true }) - return yield([]ptrace.Traces{trace}, nil) + proceed = yield([]ptrace.Traces{trace}, nil) + return proceed }) - if qs.options.ArchiveTraceReader != nil { + if proceed && qs.options.ArchiveTraceReader != nil { var missingTraceIDs []tracestore.GetTraceParams for _, id := range params.TraceIDs { if _, found := foundTraceIDs[id.TraceID]; !found { From e5a4b1b319209fd8322517c17052da8cc420345b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 22:57:36 -0500 Subject: [PATCH 17/35] Address Feedback From PR Review Signed-off-by: Mahad Zaryab --- .../internal/integration/trace_reader.go | 2 +- cmd/query/app/querysvc/query_service_v2.go | 2 +- .../app/querysvc/query_service_v2_test.go | 48 +++++++++---------- plugin/storage/integration/integration.go | 2 +- storage_v2/tracestore/mocks/Reader.go | 8 ++-- storage_v2/tracestore/reader.go | 6 +-- storage_v2/v1adapter/reader.go | 2 +- storage_v2/v1adapter/reader_test.go | 2 +- 8 files changed, 35 insertions(+), 37 deletions(-) diff --git a/cmd/jaeger/internal/integration/trace_reader.go b/cmd/jaeger/internal/integration/trace_reader.go index cfa102de13a..a61edf63f43 100644 --- a/cmd/jaeger/internal/integration/trace_reader.go +++ b/cmd/jaeger/internal/integration/trace_reader.go @@ -90,7 +90,7 @@ func (r *traceReader) GetServices(ctx context.Context) ([]string, error) { return res.Services, nil } -func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { var operations []tracestore.Operation res, err := r.client.GetOperations(ctx, &api_v3.GetOperationsRequest{ Service: query.ServiceName, diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 9d2d1a0fd00..44dde229caa 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -142,7 +142,7 @@ func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) { // GetOperations is the queryService implementation of tracestore.Reader.GetOperations func (qs QueryServiceV2) GetOperations( ctx context.Context, - query tracestore.OperationQueryParameters, + query tracestore.OperationQueryParams, ) ([]tracestore.Operation, error) { return qs.traceReader.GetOperations(ctx, query) } diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 869fd85b024..3ca1b72fb3b 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -55,7 +55,7 @@ func withArchiveTraceWriter() testOptionV2 { } } -func initializeTestServiceV2(optionAppliers ...testOptionV2) *testQueryServiceV2 { +func initializeTestServiceV2(opts ...testOptionV2) *testQueryServiceV2 { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} @@ -66,8 +66,8 @@ func initializeTestServiceV2(optionAppliers ...testOptionV2) *testQueryServiceV2 depsReader: dependencyStorage, } - for _, optApplier := range optionAppliers { - optApplier(&tqs, &options) + for _, opt := range opts { + opt(&tqs, &options) } tqs.queryService = NewQueryServiceV2(traceReader, dependencyStorage, options) @@ -76,31 +76,27 @@ func initializeTestServiceV2(optionAppliers ...testOptionV2) *testQueryServiceV2 func TestGetServicesV2(t *testing.T) { tqs := initializeTestServiceV2() - expectedServices := []string{"trifle", "bling"} - tqs.traceReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + expected := []string{"trifle", "bling"} + tqs.traceReader.On("GetServices", mock.Anything).Return(expected, nil).Once() - type contextKey string - ctx := context.Background() - actualServices, err := tqs.queryService.GetServices(context.WithValue(ctx, contextKey("foo"), "bar")) + actualServices, err := tqs.queryService.GetServices(context.Background()) require.NoError(t, err) - assert.Equal(t, expectedServices, actualServices) + assert.Equal(t, expected, actualServices) } func TestGetOperationsV2(t *testing.T) { tqs := initializeTestServiceV2() - expectedOperations := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} - operationQuery := tracestore.OperationQueryParameters{ServiceName: "abc/trifle"} + expected := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} + operationQuery := tracestore.OperationQueryParams{ServiceName: "abc/trifle"} tqs.traceReader.On( "GetOperations", - mock.AnythingOfType("*context.valueCtx"), + mock.Anything, operationQuery, - ).Return(expectedOperations, nil).Once() + ).Return(expected, nil).Once() - type contextKey string - ctx := context.Background() - actualOperations, err := tqs.queryService.GetOperations(context.WithValue(ctx, contextKey("foo"), "bar"), operationQuery) + actualOperations, err := tqs.queryService.GetOperations(context.Background(), operationQuery) require.NoError(t, err) - assert.Equal(t, expectedOperations, actualOperations) + assert.Equal(t, expected, actualOperations) } func TestArchiveTraceV2_NoOptions(t *testing.T) { @@ -118,7 +114,7 @@ func TestArchiveTraceV2_NoOptions(t *testing.T) { func TestGetDependenciesV2(t *testing.T) { tqs := initializeTestServiceV2() - expectedDependencies := []model.DependencyLink{ + expected := []model.DependencyLink{ { Parent: "killer", Child: "queen", @@ -132,26 +128,28 @@ func TestGetDependenciesV2(t *testing.T) { depstore.QueryParameters{ StartTime: endTs.Add(-defaultDependencyLookbackDuration), EndTime: endTs, - }).Return(expectedDependencies, nil).Times(1) + }).Return(expected, nil).Times(1) - actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), time.Unix(0, 1476374248550*millisToNanosMultiplier), defaultDependencyLookbackDuration) + actualDependencies, err := tqs.queryService.GetDependencies( + context.Background(), endTs, + defaultDependencyLookbackDuration) require.NoError(t, err) - assert.Equal(t, expectedDependencies, actualDependencies) + assert.Equal(t, expected, actualDependencies) } func TestGetCapabilitiesV2(t *testing.T) { tqs := initializeTestServiceV2() - expectedStorageCapabilities := StorageCapabilities{ + expected := StorageCapabilities{ ArchiveStorage: false, } - assert.Equal(t, expectedStorageCapabilities, tqs.queryService.GetCapabilities()) + assert.Equal(t, expected, tqs.queryService.GetCapabilities()) } func TestGetCapabilitiesWithSupportsArchiveV2(t *testing.T) { tqs := initializeTestServiceV2(withArchiveTraceReader(), withArchiveTraceWriter()) - expectedStorageCapabilities := StorageCapabilities{ + expected := StorageCapabilities{ ArchiveStorage: true, } - assert.Equal(t, expectedStorageCapabilities, tqs.queryService.GetCapabilities()) + assert.Equal(t, expected, tqs.queryService.GetCapabilities()) } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 159e958c5bf..982eba3e9fc 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -270,7 +270,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.TraceReader.GetOperations(context.Background(), - tracestore.OperationQueryParameters{ServiceName: "example-service-1"}) + tracestore.OperationQueryParams{ServiceName: "example-service-1"}) if err != nil { t.Log(err) return false diff --git a/storage_v2/tracestore/mocks/Reader.go b/storage_v2/tracestore/mocks/Reader.go index 5f87e43581b..fd0d0a7e708 100644 --- a/storage_v2/tracestore/mocks/Reader.go +++ b/storage_v2/tracestore/mocks/Reader.go @@ -66,7 +66,7 @@ func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryPar } // GetOperations provides a mock function with given fields: ctx, query -func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { ret := _m.Called(ctx, query) if len(ret) == 0 { @@ -75,10 +75,10 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ var r0 []tracestore.Operation var r1 error - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) ([]tracestore.Operation, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) ([]tracestore.Operation, error)); ok { return rf(ctx, query) } - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) []tracestore.Operation); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) []tracestore.Operation); ok { r0 = rf(ctx, query) } else { if ret.Get(0) != nil { @@ -86,7 +86,7 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ } } - if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParameters) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParams) error); ok { r1 = rf(ctx, query) } else { r1 = ret.Error(1) diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index 5eef0ed8aca..86160412c1f 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -36,7 +36,7 @@ type Reader interface { // GetOperations returns all operation names for a given service // known to the backend from spans within its retention period. - GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error) + GetOperations(ctx context.Context, query OperationQueryParams) ([]Operation, error) // FindTraces returns an iterator that retrieves traces matching query parameters. // The iterator is single-use: once consumed, it cannot be used again. @@ -101,8 +101,8 @@ func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryPar } } -// OperationQueryParameters contains parameters of query operations, empty spanKind means get operations for all kinds of span. -type OperationQueryParameters struct { +// OperationQueryParams contains parameters of query operations, empty spanKind means get operations for all kinds of span. +type OperationQueryParams struct { ServiceName string SpanKind string } diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index 225e9267d89..7ef6921d5e9 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -73,7 +73,7 @@ func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) { func (tr *TraceReader) GetOperations( ctx context.Context, - query tracestore.OperationQueryParameters, + query tracestore.OperationQueryParams, ) ([]tracestore.Operation, error) { o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{ ServiceName: query.ServiceName, diff --git a/storage_v2/v1adapter/reader_test.go b/storage_v2/v1adapter/reader_test.go index c142ee99c13..e1952c0a825 100644 --- a/storage_v2/v1adapter/reader_test.go +++ b/storage_v2/v1adapter/reader_test.go @@ -240,7 +240,7 @@ func TestTraceReader_GetOperationsDelegatesResponse(t *testing.T) { } operations, err := traceReader.GetOperations( context.Background(), - tracestore.OperationQueryParameters{ + tracestore.OperationQueryParams{ ServiceName: "service-a", SpanKind: "server", }) From f887d0928a9b1fd52b89ffc36f19f034f37a4327 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sun, 29 Dec 2024 23:12:44 -0500 Subject: [PATCH 18/35] Create Receive Traces Helper Function Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 68 +++++++++------------- 1 file changed, 29 insertions(+), 39 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 44dde229caa..4a12d69f387 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -91,25 +91,8 @@ func (qs QueryServiceV2) GetTraces( params GetTraceParams, ) iter.Seq2[[]ptrace.Traces, error] { getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) - aggregatedTraces := jptrace.AggregateTraces(getTracesIter) return func(yield func([]ptrace.Traces, error) bool) { - foundTraceIDs := make(map[pcommon.TraceID]struct{}) - proceed := true - aggregatedTraces(func(trace ptrace.Traces, err error) bool { - if err != nil { - proceed = yield(nil, err) - return proceed - } - if !params.RawTraces { - qs.options.Adjuster.Adjust(trace) - } - jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { - foundTraceIDs[span.TraceID()] = struct{}{} - return true - }) - proceed = yield([]ptrace.Traces{trace}, nil) - return proceed - }) + foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces) if proceed && qs.options.ArchiveTraceReader != nil { var missingTraceIDs []tracestore.GetTraceParams for _, id := range params.TraceIDs { @@ -119,16 +102,7 @@ func (qs QueryServiceV2) GetTraces( } if len(missingTraceIDs) > 0 { getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) - aggregatedArchiveTraces := jptrace.AggregateTraces(getArchiveTracesIter) - aggregatedArchiveTraces(func(trace ptrace.Traces, err error) bool { - if err != nil { - return yield(nil, err) - } - if !params.RawTraces { - qs.options.Adjuster.Adjust(trace) - } - return yield([]ptrace.Traces{trace}, nil) - }) + qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces) } } } @@ -154,17 +128,7 @@ func (qs QueryServiceV2) FindTraces( ) iter.Seq2[[]ptrace.Traces, error] { return func(yield func([]ptrace.Traces, error) bool) { tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) - tracesIter(func(traces []ptrace.Traces, err error) bool { - if err != nil { - return yield(nil, err) - } - if !query.RawTraces { - for _, trace := range traces { - qs.options.Adjuster.Adjust(trace) - } - } - return yield(traces, nil) - }) + qs.receiveTraces(tracesIter, yield, query.RawTraces) } } @@ -213,3 +177,29 @@ func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil } + +func (qs QueryServiceV2) receiveTraces( + seq iter.Seq2[[]ptrace.Traces, error], + yield func([]ptrace.Traces, error) bool, + rawTraces bool, +) (map[pcommon.TraceID]struct{}, bool) { + aggregatedTraces := jptrace.AggregateTraces(seq) + foundTraceIDs := make(map[pcommon.TraceID]struct{}) + proceed := true + aggregatedTraces(func(trace ptrace.Traces, err error) bool { + if err != nil { + proceed = yield(nil, err) + return proceed + } + if !rawTraces { + qs.options.Adjuster.Adjust(trace) + } + jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { + foundTraceIDs[span.TraceID()] = struct{}{} + return true + }) + proceed = yield([]ptrace.Traces{trace}, nil) + return proceed + }) + return foundTraceIDs, proceed +} From 959d2d4788b812768a158b313c5ab2d86ace6eb7 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 11:41:57 -0500 Subject: [PATCH 19/35] Add Unit Tests For Get Traces Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 3ca1b72fb3b..d2ec37e4f20 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -5,6 +5,7 @@ package querysvc import ( "context" + "fmt" "testing" "time" @@ -12,8 +13,10 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage_v2/depstore" depstoremocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" "github.com/jaegertracing/jaeger/storage_v2/tracestore" @@ -74,6 +77,127 @@ func initializeTestServiceV2(opts ...testOptionV2) *testQueryServiceV2 { return &tqs } +func makeTestTrace() ptrace.Traces { + trace := ptrace.NewTraces() + resources := trace.ResourceSpans().AppendEmpty() + scopes := resources.ScopeSpans().AppendEmpty() + + spanA := scopes.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + spanB := scopes.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + spanB.Attributes() + + return trace +} + +func TestGetTracesSuccess(t *testing.T) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + tqs := initializeTestServiceV2() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter, nil).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + } + var gotTraces []ptrace.Traces + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + getTracesIter(func(traces []ptrace.Traces, err error) bool { + gotTraces = append(gotTraces, traces...) + return true + }) + + require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestGetTracesWithRawTraces(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortTagsAndLogFields adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortTagsAndLogFields adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestServiceV2() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter, nil).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + RawTraces: test.rawTraces, + } + var gotTraces []ptrace.Traces + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + getTracesIter(func(traces []ptrace.Traces, err error) bool { + gotTraces = append(gotTraces, traces...) + return true + }) + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + func TestGetServicesV2(t *testing.T) { tqs := initializeTestServiceV2() expected := []string{"trifle", "bling"} From d4682497b21e3014a0d11a008699c5139c3b97b9 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 11:50:20 -0500 Subject: [PATCH 20/35] Fix Lint Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index d2ec37e4f20..088d131bea1 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -112,7 +112,7 @@ func TestGetTracesSuccess(t *testing.T) { } var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.GetTraces(context.Background(), params) - getTracesIter(func(traces []ptrace.Traces, err error) bool { + getTracesIter(func(traces []ptrace.Traces, _ error) bool { gotTraces = append(gotTraces, traces...) return true }) @@ -187,7 +187,7 @@ func TestGetTracesWithRawTraces(t *testing.T) { } var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.GetTraces(context.Background(), params) - getTracesIter(func(traces []ptrace.Traces, err error) bool { + getTracesIter(func(traces []ptrace.Traces, _ error) bool { gotTraces = append(gotTraces, traces...) return true }) From f6d157e6e55c1d087970717612edea4a5e79aefc Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 12:02:11 -0500 Subject: [PATCH 21/35] Add Unit Tests For Find Traces Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 088d131bea1..916d096a647 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -223,6 +223,132 @@ func TestGetOperationsV2(t *testing.T) { assert.Equal(t, expected, actualOperations) } +func TestFindTracesV2(t *testing.T) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + tqs := initializeTestServiceV2() + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + now := time.Now() + tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }). + Return(responseIter, nil).Once() + + query := TraceQueryParams{ + TraceQueryParams: tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }, + } + var gotTraces []ptrace.Traces + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + getTracesIter(func(traces []ptrace.Traces, _ error) bool { + gotTraces = append(gotTraces, traces...) + return true + }) + + require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestFindTracesWithRawTracesV2(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortTagsAndLogFields adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortTagsAndLogFields adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestServiceV2() + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + now := time.Now() + tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }). + Return(responseIter, nil).Once() + + query := TraceQueryParams{ + TraceQueryParams: tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }, + RawTraces: test.rawTraces, + } + var gotTraces []ptrace.Traces + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + getTracesIter(func(traces []ptrace.Traces, _ error) bool { + gotTraces = append(gotTraces, traces...) + return true + }) + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + func TestArchiveTraceV2_NoOptions(t *testing.T) { tqs := initializeTestServiceV2() From d2f1c955dc5e5e318111efeea826018536397e7b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 23:06:48 -0500 Subject: [PATCH 22/35] Add Test For Archive Trace Writer Error Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2.go | 3 +-- .../app/querysvc/query_service_v2_test.go | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 4a12d69f387..23a5df838d4 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -149,8 +149,7 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetT for _, trace := range traces { err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) if err != nil { - archiveErr = err - return false + archiveErr = errors.Join(archiveErr, err) } } return true diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 916d096a647..5ee1bd0837d 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -5,6 +5,7 @@ package querysvc import ( "context" + "errors" "fmt" "testing" "time" @@ -362,6 +363,26 @@ func TestArchiveTraceV2_NoOptions(t *testing.T) { assert.Equal(t, errNoArchiveSpanStorage, err) } +func TestArchiveTraceV2_ArchiveWriterError(t *testing.T) { + tqs := initializeTestServiceV2(withArchiveTraceWriter()) + + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter, nil).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(errors.New("cannot save")).Twice() + + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + joinErr := tqs.queryService.ArchiveTrace(context.Background(), query) + require.EqualError(t, joinErr, "cannot save") +} + func TestGetDependenciesV2(t *testing.T) { tqs := initializeTestServiceV2() expected := []model.DependencyLink{ From 6f69d6395254c7434ed6a64e4473c124d548410b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 23:11:39 -0500 Subject: [PATCH 23/35] Add Test For Archive Trace Writer Success Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 5ee1bd0837d..7315717c48c 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -5,7 +5,6 @@ package querysvc import ( "context" - "errors" "fmt" "testing" "time" @@ -373,14 +372,34 @@ func TestArchiveTraceV2_ArchiveWriterError(t *testing.T) { tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(responseIter, nil).Once() tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). - Return(errors.New("cannot save")).Twice() + Return(assert.AnError).Once() query := tracestore.GetTraceParams{ TraceID: testTraceID, } - joinErr := tqs.queryService.ArchiveTrace(context.Background(), query) - require.EqualError(t, joinErr, "cannot save") + err := tqs.queryService.ArchiveTrace(context.Background(), query) + require.ErrorIs(t, err, assert.AnError) +} + +func TestArchiveTraceV2_Success(t *testing.T) { + tqs := initializeTestServiceV2(withArchiveTraceWriter()) + + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter, nil).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(nil).Once() + + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + err := tqs.queryService.ArchiveTrace(context.Background(), query) + require.NoError(t, err) } func TestGetDependenciesV2(t *testing.T) { From 92502856db6695de775d02460266845d0e8c56b9 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 23:17:55 -0500 Subject: [PATCH 24/35] Write Test For Get Traces In Archive Storage Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 7315717c48c..e74f2d6ec24 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -198,6 +198,44 @@ func TestGetTracesWithRawTraces(t *testing.T) { } } +func TestGetTraces_TraceInArchiveStorage(t *testing.T) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, nil) + }) + + archiveResponseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + tqs := initializeTestServiceV2(withArchiveTraceReader()) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter, nil).Once() + tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(archiveResponseIter, nil).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + } + var gotTraces []ptrace.Traces + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + getTracesIter(func(traces []ptrace.Traces, _ error) bool { + gotTraces = append(gotTraces, traces...) + return true + }) + + require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + func TestGetServicesV2(t *testing.T) { tqs := initializeTestServiceV2() expected := []string{"trifle", "bling"} From 1e0c5b3f8336874b6ec9f958d23921a88f71dfd7 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 23:31:40 -0500 Subject: [PATCH 25/35] Move Tests To Separate Package To Simplify Naming Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service.go | 2 +- cmd/query/app/querysvc/query_service_test.go | 6 +- cmd/query/app/querysvc/query_service_v2.go | 25 +++--- .../app/querysvc/query_service_v2_test.go | 87 ++++++++++--------- 4 files changed, 62 insertions(+), 58 deletions(-) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 0edf884ddc9..be6a2bc256e 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -123,7 +123,7 @@ func (qs QueryService) FindTraces(ctx context.Context, query *TraceQueryParamete // ArchiveTrace is the queryService utility to archive traces. func (qs QueryService) ArchiveTrace(ctx context.Context, query spanstore.GetTraceParameters) error { if qs.options.ArchiveSpanWriter == nil { - return errNoArchiveSpanStorage + return ErrNoArchiveSpanStorage } trace, err := qs.GetTrace(ctx, GetTraceParameters{GetTraceParameters: query}) if err != nil { diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 5d34fc54919..ff2b744d74b 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -28,7 +28,11 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) +const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) + var ( + defaultDependencyLookbackDuration = time.Hour * 24 + mockTraceID = model.NewTraceID(0, 123456) mockTrace = &model.Trace{ Spans: []*model.Span{ @@ -398,7 +402,7 @@ func TestArchiveTraceNoOptions(t *testing.T) { } err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) - assert.Equal(t, errNoArchiveSpanStorage, err) + assert.Equal(t, ErrNoArchiveSpanStorage, err) } // Test QueryService.ArchiveTrace() with ArchiveSpanWriter but invalid traceID. diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/query_service_v2.go index 23a5df838d4..28be19f198f 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/query_service_v2.go @@ -19,17 +19,21 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) -var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") +var ErrNoArchiveSpanStorage = errors.New("archive span storage was not configured") const ( defaultMaxClockSkewAdjust = time.Second ) -// QueryServiceOptions has optional members of QueryService +// QueryServiceOptionsV2 holds the configuration options for the V2 QueryService. type QueryServiceOptionsV2 struct { + // ArchiveTraceReader is used to read archived traces from the storage. ArchiveTraceReader tracestore.Reader + // ArchiveTraceWriter is used to write traces to the archive storage. ArchiveTraceWriter tracestore.Writer - Adjuster adjuster.Adjuster + // Adjuster is used to adjust traces before they are returned to the client. + // If not set, the default adjuster will be used. + Adjuster adjuster.Adjuster } // StorageCapabilities is a feature flag for query service @@ -40,7 +44,7 @@ type StorageCapabilities struct { // SupportTagFilter bool } -// QueryService contains span utils required by the query-service. +// QueryServiceV2 provides methods to query data from the storage. type QueryServiceV2 struct { traceReader tracestore.Reader dependencyReader depstore.Reader @@ -64,7 +68,6 @@ type TraceQueryParams struct { RawTraces bool } -// NewQueryService returns a new QueryService. func NewQueryServiceV2( traceReader tracestore.Reader, dependencyReader depstore.Reader, @@ -108,12 +111,10 @@ func (qs QueryServiceV2) GetTraces( } } -// GetServices is the queryService implementation of tracestore.Reader.GetServices func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) { return qs.traceReader.GetServices(ctx) } -// GetOperations is the queryService implementation of tracestore.Reader.GetOperations func (qs QueryServiceV2) GetOperations( ctx context.Context, query tracestore.OperationQueryParams, @@ -121,7 +122,6 @@ func (qs QueryServiceV2) GetOperations( return qs.traceReader.GetOperations(ctx, query) } -// FindTraces is the queryService implementation of tracestore.Reader.FindTraces func (qs QueryServiceV2) FindTraces( ctx context.Context, query TraceQueryParams, @@ -132,10 +132,12 @@ func (qs QueryServiceV2) FindTraces( } } -// ArchiveTrace is the queryService utility to archive traces. +// ArchiveTrace archives a trace specified by the given query parameters. +// If the ArchiveTraceWriter is not configured, it returns +// an error indicating that there is no archive span storage available. func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { if qs.options.ArchiveTraceWriter == nil { - return errNoArchiveSpanStorage + return ErrNoArchiveSpanStorage } getTracesIter := qs.GetTraces( ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, @@ -157,7 +159,6 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetT return archiveErr } -// GetDependencies implements depstore.Reader.GetDependencies func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ StartTime: endTs.Add(-lookback), @@ -165,14 +166,12 @@ func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, l }) } -// GetCapabilities returns the features supported by the query service. func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { return StorageCapabilities{ ArchiveStorage: qs.options.hasArchiveStorage(), } } -// hasArchiveStorage returns true if archive storage reader/writer are initialized. func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil } diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index e74f2d6ec24..8921a15a3fa 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package querysvc +package querysvc_test import ( "context" @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage_v2/depstore" @@ -31,8 +32,8 @@ var ( testTraceID = pcommon.TraceID([16]byte{1}) ) -type testQueryServiceV2 struct { - queryService *QueryServiceV2 +type testQueryService struct { + queryService *querysvc.QueryServiceV2 traceReader *tracestoremocks.Reader depsReader *depstoremocks.Reader @@ -40,31 +41,31 @@ type testQueryServiceV2 struct { archiveTraceWriter *tracestoremocks.Writer } -type testOptionV2 func(*testQueryServiceV2, *QueryServiceOptionsV2) +type testOption func(*testQueryService, *querysvc.QueryServiceOptionsV2) -func withArchiveTraceReader() testOptionV2 { - return func(tqs *testQueryServiceV2, options *QueryServiceOptionsV2) { +func withArchiveTraceReader() testOption { + return func(tqs *testQueryService, options *querysvc.QueryServiceOptionsV2) { r := &tracestoremocks.Reader{} tqs.archiveTraceReader = r options.ArchiveTraceReader = r } } -func withArchiveTraceWriter() testOptionV2 { - return func(tqs *testQueryServiceV2, options *QueryServiceOptionsV2) { +func withArchiveTraceWriter() testOption { + return func(tqs *testQueryService, options *querysvc.QueryServiceOptionsV2) { r := &tracestoremocks.Writer{} tqs.archiveTraceWriter = r options.ArchiveTraceWriter = r } } -func initializeTestServiceV2(opts ...testOptionV2) *testQueryServiceV2 { +func initializeTestService(opts ...testOption) *testQueryService { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} - options := QueryServiceOptionsV2{} + options := querysvc.QueryServiceOptionsV2{} - tqs := testQueryServiceV2{ + tqs := testQueryService{ traceReader: traceReader, depsReader: dependencyStorage, } @@ -73,7 +74,7 @@ func initializeTestServiceV2(opts ...testOptionV2) *testQueryServiceV2 { opt(&tqs, &options) } - tqs.queryService = NewQueryServiceV2(traceReader, dependencyStorage, options) + tqs.queryService = querysvc.NewQueryServiceV2(traceReader, dependencyStorage, options) return &tqs } @@ -99,11 +100,11 @@ func TestGetTracesSuccess(t *testing.T) { yield([]ptrace.Traces{makeTestTrace()}, nil) }) - tqs := initializeTestServiceV2() + tqs := initializeTestService() tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(responseIter, nil).Once() - params := GetTraceParams{ + params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ { TraceID: testTraceID, @@ -173,11 +174,11 @@ func TestGetTracesWithRawTraces(t *testing.T) { yield([]ptrace.Traces{trace}, nil) }) - tqs := initializeTestServiceV2() + tqs := initializeTestService() tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(responseIter, nil).Once() - params := GetTraceParams{ + params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ { TraceID: testTraceID, @@ -207,13 +208,13 @@ func TestGetTraces_TraceInArchiveStorage(t *testing.T) { yield([]ptrace.Traces{makeTestTrace()}, nil) }) - tqs := initializeTestServiceV2(withArchiveTraceReader()) + tqs := initializeTestService(withArchiveTraceReader()) tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(responseIter, nil).Once() tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(archiveResponseIter, nil).Once() - params := GetTraceParams{ + params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ { TraceID: testTraceID, @@ -236,8 +237,8 @@ func TestGetTraces_TraceInArchiveStorage(t *testing.T) { require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) } -func TestGetServicesV2(t *testing.T) { - tqs := initializeTestServiceV2() +func TestGetServices(t *testing.T) { + tqs := initializeTestService() expected := []string{"trifle", "bling"} tqs.traceReader.On("GetServices", mock.Anything).Return(expected, nil).Once() @@ -246,8 +247,8 @@ func TestGetServicesV2(t *testing.T) { assert.Equal(t, expected, actualServices) } -func TestGetOperationsV2(t *testing.T) { - tqs := initializeTestServiceV2() +func TestGetOperations(t *testing.T) { + tqs := initializeTestService() expected := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} operationQuery := tracestore.OperationQueryParams{ServiceName: "abc/trifle"} tqs.traceReader.On( @@ -261,12 +262,12 @@ func TestGetOperationsV2(t *testing.T) { assert.Equal(t, expected, actualOperations) } -func TestFindTracesV2(t *testing.T) { +func TestFindTraces(t *testing.T) { responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { yield([]ptrace.Traces{makeTestTrace()}, nil) }) - tqs := initializeTestServiceV2() + tqs := initializeTestService() duration, err := time.ParseDuration("20ms") require.NoError(t, err) now := time.Now() @@ -279,7 +280,7 @@ func TestFindTracesV2(t *testing.T) { }). Return(responseIter, nil).Once() - query := TraceQueryParams{ + query := querysvc.TraceQueryParams{ TraceQueryParams: tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", @@ -304,7 +305,7 @@ func TestFindTracesV2(t *testing.T) { require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) } -func TestFindTracesWithRawTracesV2(t *testing.T) { +func TestFindTracesWithRawTraces(t *testing.T) { tests := []struct { rawTraces bool attributes pcommon.Map @@ -351,7 +352,7 @@ func TestFindTracesWithRawTracesV2(t *testing.T) { yield([]ptrace.Traces{trace}, nil) }) - tqs := initializeTestServiceV2() + tqs := initializeTestService() duration, err := time.ParseDuration("20ms") require.NoError(t, err) now := time.Now() @@ -364,7 +365,7 @@ func TestFindTracesWithRawTracesV2(t *testing.T) { }). Return(responseIter, nil).Once() - query := TraceQueryParams{ + query := querysvc.TraceQueryParams{ TraceQueryParams: tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", @@ -387,8 +388,8 @@ func TestFindTracesWithRawTracesV2(t *testing.T) { } } -func TestArchiveTraceV2_NoOptions(t *testing.T) { - tqs := initializeTestServiceV2() +func TestArchiveTrace_NoOptions(t *testing.T) { + tqs := initializeTestService() type contextKey string ctx := context.Background() @@ -397,11 +398,11 @@ func TestArchiveTraceV2_NoOptions(t *testing.T) { } err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) - assert.Equal(t, errNoArchiveSpanStorage, err) + assert.Equal(t, querysvc.ErrNoArchiveSpanStorage, err) } -func TestArchiveTraceV2_ArchiveWriterError(t *testing.T) { - tqs := initializeTestServiceV2(withArchiveTraceWriter()) +func TestArchiveTrace_ArchiveWriterError(t *testing.T) { + tqs := initializeTestService(withArchiveTraceWriter()) responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { yield([]ptrace.Traces{makeTestTrace()}, nil) @@ -420,8 +421,8 @@ func TestArchiveTraceV2_ArchiveWriterError(t *testing.T) { require.ErrorIs(t, err, assert.AnError) } -func TestArchiveTraceV2_Success(t *testing.T) { - tqs := initializeTestServiceV2(withArchiveTraceWriter()) +func TestArchiveTrace_Success(t *testing.T) { + tqs := initializeTestService(withArchiveTraceWriter()) responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { yield([]ptrace.Traces{makeTestTrace()}, nil) @@ -440,8 +441,8 @@ func TestArchiveTraceV2_Success(t *testing.T) { require.NoError(t, err) } -func TestGetDependenciesV2(t *testing.T) { - tqs := initializeTestServiceV2() +func TestGetDependencies(t *testing.T) { + tqs := initializeTestService() expected := []model.DependencyLink{ { Parent: "killer", @@ -465,18 +466,18 @@ func TestGetDependenciesV2(t *testing.T) { assert.Equal(t, expected, actualDependencies) } -func TestGetCapabilitiesV2(t *testing.T) { - tqs := initializeTestServiceV2() - expected := StorageCapabilities{ +func TestGetCapabilities(t *testing.T) { + tqs := initializeTestService() + expected := querysvc.StorageCapabilities{ ArchiveStorage: false, } assert.Equal(t, expected, tqs.queryService.GetCapabilities()) } -func TestGetCapabilitiesWithSupportsArchiveV2(t *testing.T) { - tqs := initializeTestServiceV2(withArchiveTraceReader(), withArchiveTraceWriter()) +func TestGetCapabilitiesWithSupportsArchive(t *testing.T) { + tqs := initializeTestService(withArchiveTraceReader(), withArchiveTraceWriter()) - expected := StorageCapabilities{ + expected := querysvc.StorageCapabilities{ ArchiveStorage: true, } assert.Equal(t, expected, tqs.queryService.GetCapabilities()) From c14fc4f84d812ffef7cf8aa20f161c9be886065c Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Mon, 30 Dec 2024 23:36:42 -0500 Subject: [PATCH 26/35] Fix Comment Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 8921a15a3fa..8e256a5d3c7 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -134,7 +134,7 @@ func TestGetTracesWithRawTraces(t *testing.T) { expected pcommon.Map }{ { - // tags should not get sorted by SortTagsAndLogFields adjuster + // tags should not get sorted by SortCollections adjuster rawTraces: true, attributes: func() pcommon.Map { m := pcommon.NewMap() @@ -150,7 +150,7 @@ func TestGetTracesWithRawTraces(t *testing.T) { }(), }, { - // tags should get sorted by SortTagsAndLogFields adjuster + // tags should get sorted by SortCollections adjuster rawTraces: false, attributes: func() pcommon.Map { m := pcommon.NewMap() From 73fbb197eb410b9c4d5c8956fe41fd6f074adaef Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 10:13:13 -0500 Subject: [PATCH 27/35] Add Test For Error In Get Trace Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 32 +++++++++++++++---- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 8e256a5d3c7..3f8abc6b62f 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -102,7 +102,7 @@ func TestGetTracesSuccess(t *testing.T) { tqs := initializeTestService() tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter, nil).Once() + Return(responseIter).Once() params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ @@ -176,7 +176,7 @@ func TestGetTracesWithRawTraces(t *testing.T) { tqs := initializeTestService() tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter, nil).Once() + Return(responseIter).Once() params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ @@ -210,7 +210,7 @@ func TestGetTraces_TraceInArchiveStorage(t *testing.T) { tqs := initializeTestService(withArchiveTraceReader()) tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter, nil).Once() + Return(responseIter).Once() tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(archiveResponseIter, nil).Once() @@ -278,7 +278,7 @@ func TestFindTraces(t *testing.T) { DurationMin: duration, NumTraces: 200, }). - Return(responseIter, nil).Once() + Return(responseIter).Once() query := querysvc.TraceQueryParams{ TraceQueryParams: tracestore.TraceQueryParams{ @@ -363,7 +363,7 @@ func TestFindTracesWithRawTraces(t *testing.T) { DurationMin: duration, NumTraces: 200, }). - Return(responseIter, nil).Once() + Return(responseIter).Once() query := querysvc.TraceQueryParams{ TraceQueryParams: tracestore.TraceQueryParams{ @@ -401,6 +401,24 @@ func TestArchiveTrace_NoOptions(t *testing.T) { assert.Equal(t, querysvc.ErrNoArchiveSpanStorage, err) } +func TestArchiveTrace_GetTraceError(t *testing.T) { + tqs := initializeTestService(withArchiveTraceWriter()) + + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, assert.AnError) + }) + + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + err := tqs.queryService.ArchiveTrace(context.Background(), query) + require.ErrorIs(t, err, assert.AnError) +} + func TestArchiveTrace_ArchiveWriterError(t *testing.T) { tqs := initializeTestService(withArchiveTraceWriter()) @@ -409,7 +427,7 @@ func TestArchiveTrace_ArchiveWriterError(t *testing.T) { }) tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter, nil).Once() + Return(responseIter).Once() tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). Return(assert.AnError).Once() @@ -429,7 +447,7 @@ func TestArchiveTrace_Success(t *testing.T) { }) tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter, nil).Once() + Return(responseIter).Once() tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). Return(nil).Once() From 5020f894ee0a47d6901e7a445f364f4040478192 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 10:20:48 -0500 Subject: [PATCH 28/35] Use Flatten With Errors Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 3f8abc6b62f..f346550573c 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -111,12 +111,9 @@ func TestGetTracesSuccess(t *testing.T) { }, }, } - var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.GetTraces(context.Background(), params) - getTracesIter(func(traces []ptrace.Traces, _ error) bool { - gotTraces = append(gotTraces, traces...) - return true - }) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) require.Len(t, gotTraces, 1) gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -186,12 +183,10 @@ func TestGetTracesWithRawTraces(t *testing.T) { }, RawTraces: test.rawTraces, } - var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.GetTraces(context.Background(), params) - getTracesIter(func(traces []ptrace.Traces, _ error) bool { - gotTraces = append(gotTraces, traces...) - return true - }) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() require.Equal(t, test.expected, gotAttributes) @@ -221,12 +216,9 @@ func TestGetTraces_TraceInArchiveStorage(t *testing.T) { }, }, } - var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.GetTraces(context.Background(), params) - getTracesIter(func(traces []ptrace.Traces, _ error) bool { - gotTraces = append(gotTraces, traces...) - return true - }) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) require.Len(t, gotTraces, 1) gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -289,12 +281,9 @@ func TestFindTraces(t *testing.T) { NumTraces: 200, }, } - var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.FindTraces(context.Background(), query) - getTracesIter(func(traces []ptrace.Traces, _ error) bool { - gotTraces = append(gotTraces, traces...) - return true - }) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) require.Len(t, gotTraces, 1) gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -375,12 +364,10 @@ func TestFindTracesWithRawTraces(t *testing.T) { }, RawTraces: test.rawTraces, } - var gotTraces []ptrace.Traces getTracesIter := tqs.queryService.FindTraces(context.Background(), query) - getTracesIter(func(traces []ptrace.Traces, _ error) bool { - gotTraces = append(gotTraces, traces...) - return true - }) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() require.Equal(t, test.expected, gotAttributes) From e29eafe14b0936781956b32954e4003c9a9ac0eb Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 10:22:43 -0500 Subject: [PATCH 29/35] Add Test For Error In Get Trace Reader Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index f346550573c..7d62cc7bac9 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -95,6 +95,25 @@ func makeTestTrace() ptrace.Traces { return trace } +func TestGetTraces_ErrorInReader(t *testing.T) { + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(nil, assert.AnError) + })).Once() + + params := querysvc.GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + _, err := iter.FlattenWithErrors(getTracesIter) + require.ErrorIs(t, err, assert.AnError) +} + func TestGetTracesSuccess(t *testing.T) { responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { yield([]ptrace.Traces{makeTestTrace()}, nil) From 2c716bf9c86a3f45c48c20f818ab0ff2a5500afe Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 10:28:13 -0500 Subject: [PATCH 30/35] Cleanup Tests Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 69 +++++++------------ 1 file changed, 26 insertions(+), 43 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 7d62cc7bac9..1d5742b95bd 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -114,27 +114,23 @@ func TestGetTraces_ErrorInReader(t *testing.T) { require.ErrorIs(t, err, assert.AnError) } -func TestGetTracesSuccess(t *testing.T) { - responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTrace()}, nil) - }) - +func TestGetTraces_Success(t *testing.T) { tqs := initializeTestService() tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter).Once() + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ - { - TraceID: testTraceID, - }, + {TraceID: testTraceID}, }, } getTracesIter := tqs.queryService.GetTraces(context.Background(), params) gotTraces, err := iter.FlattenWithErrors(getTracesIter) require.NoError(t, err) - require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() require.Equal(t, 2, gotSpans.Len()) require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) @@ -143,7 +139,7 @@ func TestGetTracesSuccess(t *testing.T) { require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) } -func TestGetTracesWithRawTraces(t *testing.T) { +func TestGetTraces_WithRawTraces(t *testing.T) { tests := []struct { rawTraces bool attributes pcommon.Map @@ -214,32 +210,28 @@ func TestGetTracesWithRawTraces(t *testing.T) { } func TestGetTraces_TraceInArchiveStorage(t *testing.T) { - responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{}, nil) - }) - - archiveResponseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTrace()}, nil) - }) - tqs := initializeTestService(withArchiveTraceReader()) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter).Once() + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, nil) + })).Once() + tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(archiveResponseIter, nil).Once() + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() params := querysvc.GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ - { - TraceID: testTraceID, - }, + {TraceID: testTraceID}, }, } getTracesIter := tqs.queryService.GetTraces(context.Background(), params) gotTraces, err := iter.FlattenWithErrors(getTracesIter) require.NoError(t, err) - require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() require.Equal(t, 2, gotSpans.Len()) require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) @@ -273,38 +265,29 @@ func TestGetOperations(t *testing.T) { assert.Equal(t, expected, actualOperations) } -func TestFindTraces(t *testing.T) { +func TestFindTraces_Success(t *testing.T) { + tqs := initializeTestService() responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { yield([]ptrace.Traces{makeTestTrace()}, nil) }) - tqs := initializeTestService() - duration, err := time.ParseDuration("20ms") - require.NoError(t, err) + duration := 20 * time.Millisecond now := time.Now() - tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + queryParams := tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", StartTimeMax: now, DurationMin: duration, NumTraces: 200, - }). - Return(responseIter).Once() - - query := querysvc.TraceQueryParams{ - TraceQueryParams: tracestore.TraceQueryParams{ - ServiceName: "service", - OperationName: "operation", - StartTimeMax: now, - DurationMin: duration, - NumTraces: 200, - }, } + tqs.traceReader.On("FindTraces", mock.Anything, queryParams).Return(responseIter).Once() + + query := querysvc.TraceQueryParams{TraceQueryParams: queryParams} getTracesIter := tqs.queryService.FindTraces(context.Background(), query) gotTraces, err := iter.FlattenWithErrors(getTracesIter) require.NoError(t, err) - require.Len(t, gotTraces, 1) + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() require.Equal(t, 2, gotSpans.Len()) require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) @@ -313,7 +296,7 @@ func TestFindTraces(t *testing.T) { require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) } -func TestFindTracesWithRawTraces(t *testing.T) { +func TestFindTraces_WithRawTraces(t *testing.T) { tests := []struct { rawTraces bool attributes pcommon.Map From 28e5fd02a9373cbaf30cab12aa234aad1218af8b Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 10:34:37 -0500 Subject: [PATCH 31/35] Combine Tests For Brevity Signed-off-by: Mahad Zaryab --- .../app/querysvc/query_service_v2_test.go | 188 +++++++++--------- 1 file changed, 97 insertions(+), 91 deletions(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 1d5742b95bd..00d809b2435 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -377,115 +377,121 @@ func TestFindTraces_WithRawTraces(t *testing.T) { } } -func TestArchiveTrace_NoOptions(t *testing.T) { - tqs := initializeTestService() - - type contextKey string - ctx := context.Background() - query := tracestore.GetTraceParams{ - TraceID: testTraceID, - } - - err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) - assert.Equal(t, querysvc.ErrNoArchiveSpanStorage, err) -} - -func TestArchiveTrace_GetTraceError(t *testing.T) { - tqs := initializeTestService(withArchiveTraceWriter()) - - responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{}, assert.AnError) - }) - - tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter).Once() - - query := tracestore.GetTraceParams{ - TraceID: testTraceID, - } - - err := tqs.queryService.ArchiveTrace(context.Background(), query) - require.ErrorIs(t, err, assert.AnError) -} - -func TestArchiveTrace_ArchiveWriterError(t *testing.T) { - tqs := initializeTestService(withArchiveTraceWriter()) - - responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTrace()}, nil) - }) - - tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter).Once() - tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). - Return(assert.AnError).Once() - - query := tracestore.GetTraceParams{ - TraceID: testTraceID, +func TestArchiveTrace(t *testing.T) { + tests := []struct { + name string + options []testOption + setupMocks func(tqs *testQueryService) + expectedError error + }{ + { + name: "no options", + options: nil, + setupMocks: func(tqs *testQueryService) {}, + expectedError: querysvc.ErrNoArchiveSpanStorage, + }, + { + name: "get trace error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, assert.AnError) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + }, + expectedError: assert.AnError, + }, + { + name: "archive writer error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(assert.AnError).Once() + }, + expectedError: assert.AnError, + }, + { + name: "success", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(nil).Once() + }, + expectedError: nil, + }, } - err := tqs.queryService.ArchiveTrace(context.Background(), query) - require.ErrorIs(t, err, assert.AnError) -} - -func TestArchiveTrace_Success(t *testing.T) { - tqs := initializeTestService(withArchiveTraceWriter()) - - responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { - yield([]ptrace.Traces{makeTestTrace()}, nil) - }) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + test.setupMocks(tqs) - tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). - Return(responseIter).Once() - tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). - Return(nil).Once() + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } - query := tracestore.GetTraceParams{ - TraceID: testTraceID, + err := tqs.queryService.ArchiveTrace(context.Background(), query) + if test.expectedError != nil { + require.ErrorIs(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + }) } - - err := tqs.queryService.ArchiveTrace(context.Background(), query) - require.NoError(t, err) } func TestGetDependencies(t *testing.T) { tqs := initializeTestService() expected := []model.DependencyLink{ - { - Parent: "killer", - Child: "queen", - CallCount: 12, - }, + {Parent: "killer", Child: "queen", CallCount: 12}, } endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) - tqs.depsReader.On( - "GetDependencies", - mock.Anything, // context.Context - depstore.QueryParameters{ - StartTime: endTs.Add(-defaultDependencyLookbackDuration), - EndTime: endTs, - }).Return(expected, nil).Times(1) - - actualDependencies, err := tqs.queryService.GetDependencies( - context.Background(), endTs, - defaultDependencyLookbackDuration) + tqs.depsReader.On("GetDependencies", mock.Anything, depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }).Return(expected, nil).Once() + + actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), endTs, defaultDependencyLookbackDuration) require.NoError(t, err) assert.Equal(t, expected, actualDependencies) } func TestGetCapabilities(t *testing.T) { - tqs := initializeTestService() - expected := querysvc.StorageCapabilities{ - ArchiveStorage: false, + tests := []struct { + name string + options []testOption + expected querysvc.StorageCapabilities + }{ + { + name: "without archive storage", + expected: querysvc.StorageCapabilities{ + ArchiveStorage: false, + }, + }, + { + name: "with archive storage", + options: []testOption{withArchiveTraceReader(), withArchiveTraceWriter()}, + expected: querysvc.StorageCapabilities{ + ArchiveStorage: true, + }, + }, } - assert.Equal(t, expected, tqs.queryService.GetCapabilities()) -} - -func TestGetCapabilitiesWithSupportsArchive(t *testing.T) { - tqs := initializeTestService(withArchiveTraceReader(), withArchiveTraceWriter()) - expected := querysvc.StorageCapabilities{ - ArchiveStorage: true, + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + assert.Equal(t, test.expected, tqs.queryService.GetCapabilities()) + }) } - assert.Equal(t, expected, tqs.queryService.GetCapabilities()) } From 5ae23f215f3eb5241284696541cccbe43afa9e38 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 11:08:46 -0500 Subject: [PATCH 32/35] Fix Lint Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service_v2_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/query_service_v2_test.go index 00d809b2435..a90bbc59c8f 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/query_service_v2_test.go @@ -387,7 +387,7 @@ func TestArchiveTrace(t *testing.T) { { name: "no options", options: nil, - setupMocks: func(tqs *testQueryService) {}, + setupMocks: func(*testQueryService) {}, expectedError: querysvc.ErrNoArchiveSpanStorage, }, { From 563642760c0d8a471ac681d7b84248958cd32e61 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 13:37:30 -0500 Subject: [PATCH 33/35] Move Query Service To V2 Package Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/query_service.go | 16 ++++++++- cmd/query/app/querysvc/query_service_test.go | 2 +- .../querysvc/service.go} | 6 ++-- .../querysvc/service_test.go} | 35 +++++++++---------- 4 files changed, 36 insertions(+), 23 deletions(-) rename cmd/query/app/querysvc/{query_service_v2.go => v2/querysvc/service.go} (97%) rename cmd/query/app/querysvc/{query_service_v2_test.go => v2/querysvc/service_test.go} (94%) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index be6a2bc256e..4eff06eeb7b 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -19,6 +19,12 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) +var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") + +const ( + defaultMaxClockSkewAdjust = time.Second +) + // QueryServiceOptions has optional members of QueryService type QueryServiceOptions struct { ArchiveSpanReader spanstore.Reader @@ -26,6 +32,14 @@ type QueryServiceOptions struct { Adjuster adjuster.Adjuster } +// StorageCapabilities is a feature flag for query service +type StorageCapabilities struct { + ArchiveStorage bool `json:"archiveStorage"` + // TODO: Maybe add metrics Storage here + // SupportRegex bool + // SupportTagFilter bool +} + // QueryService contains span utils required by the query-service. type QueryService struct { traceReader tracestore.Reader @@ -123,7 +137,7 @@ func (qs QueryService) FindTraces(ctx context.Context, query *TraceQueryParamete // ArchiveTrace is the queryService utility to archive traces. func (qs QueryService) ArchiveTrace(ctx context.Context, query spanstore.GetTraceParameters) error { if qs.options.ArchiveSpanWriter == nil { - return ErrNoArchiveSpanStorage + return errNoArchiveSpanStorage } trace, err := qs.GetTrace(ctx, GetTraceParameters{GetTraceParameters: query}) if err != nil { diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index ff2b744d74b..64ef7500e3f 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -402,7 +402,7 @@ func TestArchiveTraceNoOptions(t *testing.T) { } err := tqs.queryService.ArchiveTrace(context.WithValue(ctx, contextKey("foo"), "bar"), query) - assert.Equal(t, ErrNoArchiveSpanStorage, err) + assert.Equal(t, errNoArchiveSpanStorage, err) } // Test QueryService.ArchiveTrace() with ArchiveSpanWriter but invalid traceID. diff --git a/cmd/query/app/querysvc/query_service_v2.go b/cmd/query/app/querysvc/v2/querysvc/service.go similarity index 97% rename from cmd/query/app/querysvc/query_service_v2.go rename to cmd/query/app/querysvc/v2/querysvc/service.go index 28be19f198f..e0383388f3e 100644 --- a/cmd/query/app/querysvc/query_service_v2.go +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/adjuster" + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" @@ -19,7 +19,7 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) -var ErrNoArchiveSpanStorage = errors.New("archive span storage was not configured") +var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") const ( defaultMaxClockSkewAdjust = time.Second @@ -137,7 +137,7 @@ func (qs QueryServiceV2) FindTraces( // an error indicating that there is no archive span storage available. func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { if qs.options.ArchiveTraceWriter == nil { - return ErrNoArchiveSpanStorage + return errNoArchiveSpanStorage } getTracesIter := qs.GetTraces( ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, diff --git a/cmd/query/app/querysvc/query_service_v2_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go similarity index 94% rename from cmd/query/app/querysvc/query_service_v2_test.go rename to cmd/query/app/querysvc/v2/querysvc/service_test.go index a90bbc59c8f..1993fc87ac7 100644 --- a/cmd/query/app/querysvc/query_service_v2_test.go +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -1,7 +1,7 @@ // Copyright (c) 2024 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 -package querysvc_test +package querysvc import ( "context" @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" - "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage_v2/depstore" @@ -33,7 +32,7 @@ var ( ) type testQueryService struct { - queryService *querysvc.QueryServiceV2 + queryService *QueryServiceV2 traceReader *tracestoremocks.Reader depsReader *depstoremocks.Reader @@ -41,10 +40,10 @@ type testQueryService struct { archiveTraceWriter *tracestoremocks.Writer } -type testOption func(*testQueryService, *querysvc.QueryServiceOptionsV2) +type testOption func(*testQueryService, *QueryServiceOptionsV2) func withArchiveTraceReader() testOption { - return func(tqs *testQueryService, options *querysvc.QueryServiceOptionsV2) { + return func(tqs *testQueryService, options *QueryServiceOptionsV2) { r := &tracestoremocks.Reader{} tqs.archiveTraceReader = r options.ArchiveTraceReader = r @@ -52,7 +51,7 @@ func withArchiveTraceReader() testOption { } func withArchiveTraceWriter() testOption { - return func(tqs *testQueryService, options *querysvc.QueryServiceOptionsV2) { + return func(tqs *testQueryService, options *QueryServiceOptionsV2) { r := &tracestoremocks.Writer{} tqs.archiveTraceWriter = r options.ArchiveTraceWriter = r @@ -63,7 +62,7 @@ func initializeTestService(opts ...testOption) *testQueryService { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} - options := querysvc.QueryServiceOptionsV2{} + options := QueryServiceOptionsV2{} tqs := testQueryService{ traceReader: traceReader, @@ -74,7 +73,7 @@ func initializeTestService(opts ...testOption) *testQueryService { opt(&tqs, &options) } - tqs.queryService = querysvc.NewQueryServiceV2(traceReader, dependencyStorage, options) + tqs.queryService = NewQueryServiceV2(traceReader, dependencyStorage, options) return &tqs } @@ -102,7 +101,7 @@ func TestGetTraces_ErrorInReader(t *testing.T) { yield(nil, assert.AnError) })).Once() - params := querysvc.GetTraceParams{ + params := GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ { TraceID: testTraceID, @@ -121,7 +120,7 @@ func TestGetTraces_Success(t *testing.T) { yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() - params := querysvc.GetTraceParams{ + params := GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ {TraceID: testTraceID}, }, @@ -190,7 +189,7 @@ func TestGetTraces_WithRawTraces(t *testing.T) { tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). Return(responseIter).Once() - params := querysvc.GetTraceParams{ + params := GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ { TraceID: testTraceID, @@ -222,7 +221,7 @@ func TestGetTraces_TraceInArchiveStorage(t *testing.T) { yield([]ptrace.Traces{makeTestTrace()}, nil) })).Once() - params := querysvc.GetTraceParams{ + params := GetTraceParams{ TraceIDs: []tracestore.GetTraceParams{ {TraceID: testTraceID}, }, @@ -282,7 +281,7 @@ func TestFindTraces_Success(t *testing.T) { } tqs.traceReader.On("FindTraces", mock.Anything, queryParams).Return(responseIter).Once() - query := querysvc.TraceQueryParams{TraceQueryParams: queryParams} + query := TraceQueryParams{TraceQueryParams: queryParams} getTracesIter := tqs.queryService.FindTraces(context.Background(), query) gotTraces, err := iter.FlattenWithErrors(getTracesIter) require.NoError(t, err) @@ -356,7 +355,7 @@ func TestFindTraces_WithRawTraces(t *testing.T) { }). Return(responseIter).Once() - query := querysvc.TraceQueryParams{ + query := TraceQueryParams{ TraceQueryParams: tracestore.TraceQueryParams{ ServiceName: "service", OperationName: "operation", @@ -388,7 +387,7 @@ func TestArchiveTrace(t *testing.T) { name: "no options", options: nil, setupMocks: func(*testQueryService) {}, - expectedError: querysvc.ErrNoArchiveSpanStorage, + expectedError: errNoArchiveSpanStorage, }, { name: "get trace error", @@ -471,18 +470,18 @@ func TestGetCapabilities(t *testing.T) { tests := []struct { name string options []testOption - expected querysvc.StorageCapabilities + expected StorageCapabilities }{ { name: "without archive storage", - expected: querysvc.StorageCapabilities{ + expected: StorageCapabilities{ ArchiveStorage: false, }, }, { name: "with archive storage", options: []testOption{withArchiveTraceReader(), withArchiveTraceWriter()}, - expected: querysvc.StorageCapabilities{ + expected: StorageCapabilities{ ArchiveStorage: true, }, }, From 529bcdaa7227ca74bde32a6c7bdcab06fb2750e2 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 13:40:34 -0500 Subject: [PATCH 34/35] Fix Lint Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/v2/querysvc/package_test.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 cmd/query/app/querysvc/v2/querysvc/package_test.go diff --git a/cmd/query/app/querysvc/v2/querysvc/package_test.go b/cmd/query/app/querysvc/v2/querysvc/package_test.go new file mode 100644 index 00000000000..755423f86da --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} From 806a265a5727577e95d9ee435a0968f55c327044 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Tue, 31 Dec 2024 14:07:07 -0500 Subject: [PATCH 35/35] Drop V2 Suffix Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/v2/querysvc/service.go | 40 +++++++++---------- .../app/querysvc/v2/querysvc/service_test.go | 12 +++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/cmd/query/app/querysvc/v2/querysvc/service.go b/cmd/query/app/querysvc/v2/querysvc/service.go index e0383388f3e..bfcc98053c3 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service.go +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -25,8 +25,8 @@ const ( defaultMaxClockSkewAdjust = time.Second ) -// QueryServiceOptionsV2 holds the configuration options for the V2 QueryService. -type QueryServiceOptionsV2 struct { +// QueryServiceOptions holds the configuration options for the query service. +type QueryServiceOptions struct { // ArchiveTraceReader is used to read archived traces from the storage. ArchiveTraceReader tracestore.Reader // ArchiveTraceWriter is used to write traces to the archive storage. @@ -44,11 +44,11 @@ type StorageCapabilities struct { // SupportTagFilter bool } -// QueryServiceV2 provides methods to query data from the storage. -type QueryServiceV2 struct { +// QueryService provides methods to query data from the storage. +type QueryService struct { traceReader tracestore.Reader dependencyReader depstore.Reader - options QueryServiceOptionsV2 + options QueryServiceOptions } // GetTraceParams defines the parameters for retrieving traces using the GetTraces function. @@ -56,7 +56,7 @@ type GetTraceParams struct { // TraceIDs is a slice of trace identifiers to fetch. TraceIDs []tracestore.GetTraceParams // RawTraces indicates whether to retrieve raw traces. - // If set to false, the traces will be adjusted using QueryServiceOptionsV2.Adjuster. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. RawTraces bool } @@ -64,16 +64,16 @@ type GetTraceParams struct { type TraceQueryParams struct { tracestore.TraceQueryParams // RawTraces indicates whether to retrieve raw traces. - // If set to false, the traces will be adjusted using QueryServiceOptionsV2.Adjuster. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. RawTraces bool } -func NewQueryServiceV2( +func NewQueryService( traceReader tracestore.Reader, dependencyReader depstore.Reader, - options QueryServiceOptionsV2, -) *QueryServiceV2 { - qsvc := &QueryServiceV2{ + options QueryServiceOptions, +) *QueryService { + qsvc := &QueryService{ traceReader: traceReader, dependencyReader: dependencyReader, options: options, @@ -89,7 +89,7 @@ func NewQueryServiceV2( // GetTraces retrieves traces with given trace IDs from the primary reader, // and if any of them are not found it then queries the archive reader. // The iterator is single-use: once consumed, it cannot be used again. -func (qs QueryServiceV2) GetTraces( +func (qs QueryService) GetTraces( ctx context.Context, params GetTraceParams, ) iter.Seq2[[]ptrace.Traces, error] { @@ -111,18 +111,18 @@ func (qs QueryServiceV2) GetTraces( } } -func (qs QueryServiceV2) GetServices(ctx context.Context) ([]string, error) { +func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { return qs.traceReader.GetServices(ctx) } -func (qs QueryServiceV2) GetOperations( +func (qs QueryService) GetOperations( ctx context.Context, query tracestore.OperationQueryParams, ) ([]tracestore.Operation, error) { return qs.traceReader.GetOperations(ctx, query) } -func (qs QueryServiceV2) FindTraces( +func (qs QueryService) FindTraces( ctx context.Context, query TraceQueryParams, ) iter.Seq2[[]ptrace.Traces, error] { @@ -135,7 +135,7 @@ func (qs QueryServiceV2) FindTraces( // ArchiveTrace archives a trace specified by the given query parameters. // If the ArchiveTraceWriter is not configured, it returns // an error indicating that there is no archive span storage available. -func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { +func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { if qs.options.ArchiveTraceWriter == nil { return errNoArchiveSpanStorage } @@ -159,24 +159,24 @@ func (qs QueryServiceV2) ArchiveTrace(ctx context.Context, query tracestore.GetT return archiveErr } -func (qs QueryServiceV2) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ StartTime: endTs.Add(-lookback), EndTime: endTs, }) } -func (qs QueryServiceV2) GetCapabilities() StorageCapabilities { +func (qs QueryService) GetCapabilities() StorageCapabilities { return StorageCapabilities{ ArchiveStorage: qs.options.hasArchiveStorage(), } } -func (opts *QueryServiceOptionsV2) hasArchiveStorage() bool { +func (opts *QueryServiceOptions) hasArchiveStorage() bool { return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil } -func (qs QueryServiceV2) receiveTraces( +func (qs QueryService) receiveTraces( seq iter.Seq2[[]ptrace.Traces, error], yield func([]ptrace.Traces, error) bool, rawTraces bool, diff --git a/cmd/query/app/querysvc/v2/querysvc/service_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go index 1993fc87ac7..4794c2d4c99 100644 --- a/cmd/query/app/querysvc/v2/querysvc/service_test.go +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -32,7 +32,7 @@ var ( ) type testQueryService struct { - queryService *QueryServiceV2 + queryService *QueryService traceReader *tracestoremocks.Reader depsReader *depstoremocks.Reader @@ -40,10 +40,10 @@ type testQueryService struct { archiveTraceWriter *tracestoremocks.Writer } -type testOption func(*testQueryService, *QueryServiceOptionsV2) +type testOption func(*testQueryService, *QueryServiceOptions) func withArchiveTraceReader() testOption { - return func(tqs *testQueryService, options *QueryServiceOptionsV2) { + return func(tqs *testQueryService, options *QueryServiceOptions) { r := &tracestoremocks.Reader{} tqs.archiveTraceReader = r options.ArchiveTraceReader = r @@ -51,7 +51,7 @@ func withArchiveTraceReader() testOption { } func withArchiveTraceWriter() testOption { - return func(tqs *testQueryService, options *QueryServiceOptionsV2) { + return func(tqs *testQueryService, options *QueryServiceOptions) { r := &tracestoremocks.Writer{} tqs.archiveTraceWriter = r options.ArchiveTraceWriter = r @@ -62,7 +62,7 @@ func initializeTestService(opts ...testOption) *testQueryService { traceReader := &tracestoremocks.Reader{} dependencyStorage := &depstoremocks.Reader{} - options := QueryServiceOptionsV2{} + options := QueryServiceOptions{} tqs := testQueryService{ traceReader: traceReader, @@ -73,7 +73,7 @@ func initializeTestService(opts ...testOption) *testQueryService { opt(&tqs, &options) } - tqs.queryService = NewQueryServiceV2(traceReader, dependencyStorage, options) + tqs.queryService = NewQueryService(traceReader, dependencyStorage, options) return &tqs }