Skip to content

Commit

Permalink
feat(frontend): search spans from extension tracers when object name …
Browse files Browse the repository at this point in the history
…is specified
  • Loading branch information
xuqingyun committed Oct 8, 2024
1 parent f8d189a commit df20cd3
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 5 deletions.
12 changes: 11 additions & 1 deletion pkg/frontend/reader/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,24 @@ func newObject[M any](key objKey, trace *tftree.SpanTree, metadata M) (*object[M
if err != nil {
return nil, fmt.Errorf("clone spans: %w", err)
}

var m []M
if !isNil(metadata) {
m = []M{metadata}
}

obj := &object[M]{
key: key,
metadata: []M{metadata},
metadata: m,
tree: clonedTree,
}
return obj, nil
}

func isNil(v interface{}) bool {
return v == nil
}

func (obj *object[M]) merge(trace *tftree.SpanTree, metadata M) error {
obj.metadata = append(obj.metadata, metadata)

Expand Down
60 changes: 56 additions & 4 deletions pkg/frontend/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
return nil, err
}

fetchFromExtension := false

if len(tts) == 0 {
// searched object has no events during this period,
// try to discover any possible spans from extensions.
if query.Tags["resource"] != "" && query.Tags["name"] != "" {
tts = append(tts, &jaegerbackend.TraceThumbnail{
Identifier: nil,
Spans: tftree.NewSpanTree(fakeObjectSpans(query)),
})
fetchFromExtension = true
}
}

twmList := make([]merge.TraceWithMetadata[any], len(tts))
for i, tt := range tts {
twmList[i] = merge.TraceWithMetadata[any]{
Expand Down Expand Up @@ -250,9 +264,18 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace
return nil, err
}

if fetchFromExtension && len(trace.Spans) <= 1 {
continue
}

var cachedSpans []*model.Span
if fetchFromExtension {
cachedSpans = trace.Spans
}

traces = append(traces, trace)

cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache)
cacheEntry, err := reader.prepareCache(rootKey, query, mergeTree.Metadata, cacheId, extensionCache, cachedSpans)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -318,6 +341,7 @@ func (reader *spanReader) prepareCache(
identifiers []any,
cacheId model.TraceID,
extensionCache []tracecache.ExtensionCache,
cachedSpans []*model.Span,
) (tracecache.Entry, error) {
identifiersJson := make([]json.RawMessage, len(identifiers))
for i, identifier := range identifiers {
Expand All @@ -336,6 +360,7 @@ func (reader *spanReader) prepareCache(
StartTime: query.StartTimeMin,
EndTime: query.StartTimeMax,
RootObject: rootKey,
Spans: cachedSpans,
},
}
if reader.options.cacheExtensions {
Expand All @@ -360,6 +385,9 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (

traces := make([]merge.TraceWithMetadata[struct{}], 0, len(entry.Identifiers))
for _, identifier := range entry.Identifiers {
if identifier == nil {
continue
}
trace, err := reader.Backend.Get(ctx, identifier, cacheId, entry.StartTime, entry.EndTime)
if err != nil {
return nil, fmt.Errorf("cannot fetch trace pointed by the cache: %w", err)
Expand Down Expand Up @@ -394,15 +422,20 @@ func (reader *spanReader) GetTrace(ctx context.Context, cacheId model.TraceID) (
}

// if spans were connected, they should continue to be connected since link spans cannot be deleted, so assume there is only one trace
if len(mergedTrees) != 1 {
if len(entry.Identifiers) != 0 && len(mergedTrees) != 1 {
return nil, fmt.Errorf("inconsistent linked trace count %d", len(mergedTrees))
}
mergedTree := mergedTrees[0]

aggTrace := &model.Trace{
ProcessMap: []model.Trace_ProcessMapping{{
ProcessID: "0",
}},
Spans: mergedTree.Tree.GetSpans(),
}

if len(mergedTrees) > 0 {
aggTrace.Spans = mergedTrees[0].Tree.GetSpans()
} else {
aggTrace.Spans = entry.Spans
}

var extensions transform.ExtensionProcessor = &transform.FetchExtensionsAndStoreCache{}
Expand Down Expand Up @@ -533,3 +566,22 @@ func mergeListWithBackend[M any](backend jaegerbackend.Backend, convertMetadata
return twmList, nil
}
}

func fakeObjectSpans(query *spanstore.TraceQueryParameters) []*model.Span {
tags := []model.KeyValue{
model.String(zconstants.TraceSource, zconstants.TraceSourceObject),
model.String(zconstants.PseudoType, string(zconstants.PseudoTypeObject)),
}
for tagKey, tagVal := range query.Tags {
tags = append(tags, model.String(tagKey, tagVal))
}
return []*model.Span{{
SpanID: model.SpanID(1),
Flags: 0,
StartTime: query.StartTimeMin,
Duration: query.StartTimeMax.Sub(query.StartTimeMin),
Process: &model.Process{ServiceName: fmt.Sprintf("%s %s", query.Tags["cluster"], query.Tags["resource"])},
ProcessID: "0",
Tags: tags,
}}
}
1 change: 1 addition & 0 deletions pkg/frontend/tracecache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type EntryValue struct {
RootObject *utilobject.Key `json:"rootObject"`

Extensions []ExtensionCache `json:"extensions"`
Spans []*model.Span `json:"spans"`
}

type ExtensionCache struct {
Expand Down

0 comments on commit df20cd3

Please sign in to comment.