From df20cd3d8d0905a8f0f00c49f20be486024265ee Mon Sep 17 00:00:00 2001 From: xuqingyun Date: Mon, 30 Sep 2024 18:00:31 +0800 Subject: [PATCH] feat(frontend): search spans from extension tracers when object name is specified --- pkg/frontend/reader/merge/merge.go | 12 +++++- pkg/frontend/reader/reader.go | 60 ++++++++++++++++++++++++++-- pkg/frontend/tracecache/interface.go | 1 + 3 files changed, 68 insertions(+), 5 deletions(-) diff --git a/pkg/frontend/reader/merge/merge.go b/pkg/frontend/reader/merge/merge.go index 5ed1c6a5..6bd2ec46 100644 --- a/pkg/frontend/reader/merge/merge.go +++ b/pkg/frontend/reader/merge/merge.go @@ -237,14 +237,24 @@ func newObject[M any](key objKey, trace *tftree.SpanTree, metadata M) (*object[M if err != nil { return nil, fmt.Errorf("clone spans: %w", err) } + + var m []M + if !isNil(metadata) { + m = []M{metadata} + } + obj := &object[M]{ key: key, - metadata: []M{metadata}, + metadata: m, tree: clonedTree, } return obj, nil } +func isNil(v interface{}) bool { + return v == nil +} + func (obj *object[M]) merge(trace *tftree.SpanTree, metadata M) error { obj.metadata = append(obj.metadata, metadata) diff --git a/pkg/frontend/reader/reader.go b/pkg/frontend/reader/reader.go index 27a46190..8bf3e859 100644 --- a/pkg/frontend/reader/reader.go +++ b/pkg/frontend/reader/reader.go @@ -205,6 +205,20 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace return nil, err } + fetchFromExtension := false + + if len(tts) == 0 { + // searched object has no events during this period, + // try to discover any possible spans from extensions. + if query.Tags["resource"] != "" && query.Tags["name"] != "" { + tts = append(tts, &jaegerbackend.TraceThumbnail{ + Identifier: nil, + Spans: tftree.NewSpanTree(fakeObjectSpans(query)), + }) + fetchFromExtension = true + } + } + twmList := make([]merge.TraceWithMetadata[any], len(tts)) for i, tt := range tts { twmList[i] = merge.TraceWithMetadata[any]{ @@ -250,9 +264,18 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace return nil, err } + if fetchFromExtension && len(trace.Spans) <= 1 { + continue + } + + var cachedSpans []*model.Span + if fetchFromExtension { + cachedSpans = trace.Spans + } + traces = append(traces, trace) - cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache) + cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache, cachedSpans) if err != nil { return nil, err } @@ -318,6 +341,7 @@ func (reader *spanReader) prepareCache( identifiers []any, cacheId model.TraceID, extensionCache []tracecache.ExtensionCache, + cachedSpans []*model.Span, ) (tracecache.Entry, error) { identifiersJson := make([]json.RawMessage, len(identifiers)) for i, identifier := range identifiers { @@ -336,6 +360,7 @@ func (reader *spanReader) prepareCache( StartTime: query.StartTimeMin, EndTime: query.StartTimeMax, RootObject: rootKey, + Spans: cachedSpans, }, } if reader.options.cacheExtensions { @@ -360,6 +385,9 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) ( traces := make([]merge.TraceWithMetadata[struct{}], 0, len(entry.Identifiers)) for _, identifier := range entry.Identifiers { + if identifier == nil { + continue + } trace, err := reader.Backend.Get(ctx, identifier, cacheId, entry.StartTime, entry.EndTime) if err != nil { return nil, fmt.Errorf("cannot fetch trace pointed by the cache: %w", err) @@ -394,15 +422,20 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) ( } // if spans were connected, they should continue to be connected since link spans cannot be deleted, so assume there is only one trace - if len(mergedTrees) != 1 { + if len(entry.Identifiers) != 0 && len(mergedTrees) != 1 { return nil, fmt.Errorf("inconsistent linked trace count %d", len(mergedTrees)) } - mergedTree := mergedTrees[0] + aggTrace := &model.Trace{ ProcessMap: []model.Trace_ProcessMapping{{ ProcessID: "0", }}, - Spans: mergedTree.Tree.GetSpans(), + } + + if len(mergedTrees) > 0 { + aggTrace.Spans = mergedTrees[0].Tree.GetSpans() + } else { + aggTrace.Spans = entry.Spans } var extensions transform.ExtensionProcessor = &transform.FetchExtensionsAndStoreCache{} @@ -533,3 +566,22 @@ func mergeListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata return twmList, nil } } + +func fakeObjectSpans(query *spanstore.TraceQueryParameters) []*model.Span { + tags := []model.KeyValue{ + model.String(zconstants.TraceSource, zconstants.TraceSourceObject), + model.String(zconstants.PseudoType, string(zconstants.PseudoTypeObject)), + } + for tagKey, tagVal := range query.Tags { + tags = append(tags, model.String(tagKey, tagVal)) + } + return []*model.Span{{ + SpanID: model.SpanID(1), + Flags: 0, + StartTime: query.StartTimeMin, + Duration: query.StartTimeMax.Sub(query.StartTimeMin), + Process: &model.Process{ServiceName: fmt.Sprintf("%s %s", query.Tags["cluster"], query.Tags["resource"])}, + ProcessID: "0", + Tags: tags, + }} +} diff --git a/pkg/frontend/tracecache/interface.go b/pkg/frontend/tracecache/interface.go index 6e061403..5f2590bc 100644 --- a/pkg/frontend/tracecache/interface.go +++ b/pkg/frontend/tracecache/interface.go @@ -48,6 +48,7 @@ type EntryValue struct { RootObject *utilobject.Key `json:"rootObject"` Extensions []ExtensionCache `json:"extensions"` + Spans []*model.Span `json:"spans"` } type ExtensionCache struct {