Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(frontend): search spans from extension tracers when object name is specified #369

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions pkg/frontend/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
return nil, err
}

fetchFromExtension := false

if len(tts) == 0 {
// searched object has no events during this period,
// try to discover any possible spans from extensions.
if query.Tags["resource"] != "" && query.Tags["name"] != "" {
tts = append(tts, &jaegerbackend.TraceThumbnail{
Identifier: nil,
Spans: tftree.NewSpanTree(fakeObjectSpans(query)),
})
fetchFromExtension = true
}
}

twmList := make([]merge.TraceWithMetadata[any], len(tts))
for i, tt := range tts {
twmList[i] = merge.TraceWithMetadata[any]{
Expand Down Expand Up @@ -245,14 +259,24 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
for _, mergeTree := range mergeTrees {
cacheId := generateCacheId(config.Id)

var cachedSpans []*model.Span
if fetchFromExtension {
rootSpan, _ := tftree.CopySpan(mergeTree.Tree.Root)
cachedSpans = []*model.Span{rootSpan}
}

trace, extensionCache, err := reader.prepareEntry(ctx, rootKey, query, mergeTree.Tree, cacheId)
if err != nil {
return nil, err
}

if fetchFromExtension && len(trace.Spans) <= 1 {
continue
}

traces = append(traces, trace)

cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache)
cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache, cachedSpans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -318,6 +342,7 @@ func (reader *spanReader) prepareCache(
identifiers []any,
cacheId model.TraceID,
extensionCache []tracecache.ExtensionCache,
cachedSpans []*model.Span,
) (tracecache.Entry, error) {
identifiersJson := make([]json.RawMessage, len(identifiers))
for i, identifier := range identifiers {
Expand All @@ -336,6 +361,7 @@ func (reader *spanReader) prepareCache(
StartTime: query.StartTimeMin,
EndTime: query.StartTimeMax,
RootObject: rootKey,
Spans: cachedSpans,
},
}
if reader.options.cacheExtensions {
Expand All @@ -360,6 +386,9 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (

traces := make([]merge.TraceWithMetadata[struct{}], 0, len(entry.Identifiers))
for _, identifier := range entry.Identifiers {
if identifier == nil || string(identifier) == "null" {
continue
}
trace, err := reader.Backend.Get(ctx, identifier, cacheId, entry.StartTime, entry.EndTime)
if err != nil {
return nil, fmt.Errorf("cannot fetch trace pointed by the cache: %w", err)
Expand Down Expand Up @@ -394,15 +423,21 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (
}

// if spans were connected, they should continue to be connected since link spans cannot be deleted, so assume there is only one trace
if len(mergedTrees) != 1 {
if len(traces) != 0 && len(mergedTrees) != 1 {
return nil, fmt.Errorf("inconsistent linked trace count %d", len(mergedTrees))
}
mergedTree := mergedTrees[0]

aggTrace := &model.Trace{
ProcessMap: []model.Trace_ProcessMapping{{
ProcessID: "0",
}},
Spans: mergedTree.Tree.GetSpans(),
}

if len(mergedTrees) > 0 {
aggTrace.Spans = mergedTrees[0].Tree.GetSpans()
} else {
spans, _ := tftree.CopySpans(entry.Spans)
aggTrace.Spans = spans
}

var extensions transform.ExtensionProcessor = &transform.FetchExtensionsAndStoreCache{}
Expand Down Expand Up @@ -533,3 +568,29 @@ func mergeListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata
return twmList, nil
}
}

func fakeObjectSpans(query *spanstore.TraceQueryParameters) []*model.Span {
tags := []model.KeyValue{
model.String(zconstants.TraceSource, zconstants.TraceSourceObject),
model.String(zconstants.PseudoType, string(zconstants.PseudoTypeObject)),
}
for tagKey, tagVal := range query.Tags {
tags = append(tags, model.String(tagKey, tagVal))
}

for _, requiredKey := range []string{"cluster", "group", "version", "resource", "namespace", "name"} {
if _, ok := query.Tags[requiredKey]; !ok {
tags = append(tags, model.String(requiredKey, query.Tags[requiredKey]))
}
}

return []*model.Span{{
SpanID: model.SpanID(1),
Flags: 0,
StartTime: query.StartTimeMin,
Duration: query.StartTimeMax.Sub(query.StartTimeMin),
Process: &model.Process{ServiceName: fmt.Sprintf("%s %s", query.Tags["cluster"], query.Tags["resource"])},
ProcessID: "0",
Tags: tags,
}}
}
14 changes: 14 additions & 0 deletions pkg/frontend/tf/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ func (tree *SpanTree) Clone() (*SpanTree, error) {
return NewSpanTree(copiedSpans), nil
}

func CopySpans(spans []*model.Span) ([]*model.Span, error) {
copiedSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
spanCopy, err := CopySpan(span)
if err != nil {
return nil, err
}

copiedSpans = append(copiedSpans, spanCopy)
}

return copiedSpans, nil
}

func CopySpan(span *model.Span) (*model.Span, error) {
spanJson, err := json.Marshal(span)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/tracecache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EntryValue struct {
RootObject *utilobject.Key `json:"rootObject"`

Extensions []ExtensionCache `json:"extensions"`
Spans []*model.Span `json:"spans"`
}

type ExtensionCache struct {
Expand Down
Loading