diff --git a/cmd/jaeger/internal/integration/badger_test.go b/cmd/jaeger/internal/integration/badger_test.go index b5c62d5c21a..3c2a7bdc5a4 100644 --- a/cmd/jaeger/internal/integration/badger_test.go +++ b/cmd/jaeger/internal/integration/badger_test.go @@ -17,10 +17,6 @@ func TestBadgerStorage(t *testing.T) { StorageIntegration: integration.StorageIntegration{ SkipArchiveTest: true, CleanUp: purge, - - // TODO: remove this once badger supports returning spanKind from GetOperations - // Cf https://github.com/jaegertracing/jaeger/issues/1922 - GetOperationsMissingSpanKind: true, }, } s.e2eInitialize(t, "badger") diff --git a/plugin/storage/badger/factory.go b/plugin/storage/badger/factory.go index 10de32d8e8e..a82e0bace2e 100644 --- a/plugin/storage/badger/factory.go +++ b/plugin/storage/badger/factory.go @@ -15,6 +15,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/spf13/viper" + "go.opentelemetry.io/collector/featuregate" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/distributedlock" @@ -46,7 +47,15 @@ var ( // interface comformance checks // TODO badger could implement archive storage // _ storage.ArchiveFactory = (*Factory)(nil) - _ storage.SamplingStoreFactory = (*Factory)(nil) + _ storage.SamplingStoreFactory = (*Factory)(nil) + includeDualLookUp = featuregate.GlobalRegistry().MustRegister( + "jaeger.badger.dualLookUp", + featuregate.StageBeta, // enabed by default + featuregate.WithRegisterFromVersion("v2.2.0"), + featuregate.WithRegisterToVersion("v2.5.0"), + featuregate.WithRegisterDescription("Allows reader to look up for traces from old index key"), + featuregate.WithRegisterReferenceURL("https://github.com/jaegertracing/jaeger/pull/6376"), + ) ) // Factory implements storage.Factory for Badger backend. @@ -150,7 +159,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.store = store - f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans) + f.cache = badgerStore.NewCacheStore(f.Config.TTL.Spans) f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName}) f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName}) @@ -176,7 +185,7 @@ func initializeDir(path string) { // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - tr := badgerStore.NewTraceReader(f.store, f.cache, true) + tr := badgerStore.NewTraceReader(f.store, f.cache, true, includeDualLookUp.IsEnabled()) return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil } diff --git a/plugin/storage/badger/spanstore/backward_compatibility_test.go b/plugin/storage/badger/spanstore/backward_compatibility_test.go new file mode 100644 index 00000000000..a4d6bca88cf --- /dev/null +++ b/plugin/storage/badger/spanstore/backward_compatibility_test.go @@ -0,0 +1,56 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/dgraph-io/badger/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// This test is for checking the backward compatibility after changing the index. +// Once dual lookup is completely removed, this test can be removed +func TestBackwardCompatibility(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + startT := time.Now() + tid := startT + cache := NewCacheStore(1 * time.Hour) + reader := NewTraceReader(store, cache, true, true) + writer := NewSpanWriter(store, cache, 1*time.Hour) + oldSpan := model.Span{ + TraceID: model.TraceID{ + Low: 0, + High: 1, + }, + SpanID: model.SpanID(rand.Uint64()), + OperationName: "operation-1", + Process: &model.Process{ + ServiceName: "service", + }, + StartTime: tid, + Duration: time.Duration(time.Duration(1) * time.Millisecond), + } + err := writer.writeSpan(&oldSpan, true) + require.NoError(t, err) + traces, err := reader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{ + ServiceName: "service", + OperationName: "operation-1", + StartTimeMin: startT, + StartTimeMax: startT.Add(time.Duration(time.Millisecond * 10)), + }) + require.NoError(t, err) + assert.Len(t, traces, 1) + assert.Len(t, traces[0].Spans, 1) + assert.Equal(t, oldSpan.TraceID, traces[0].Spans[0].TraceID) + assert.Equal(t, oldSpan.SpanID, traces[0].Spans[0].SpanID) + }) +} diff --git a/plugin/storage/badger/spanstore/cache.go b/plugin/storage/badger/spanstore/cache.go index 9bebc6c63b7..92d6239d0ec 100644 --- a/plugin/storage/badger/spanstore/cache.go +++ b/plugin/storage/badger/spanstore/cache.go @@ -8,29 +8,37 @@ import ( "sync" "time" - "github.com/dgraph-io/badger/v4" - + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" ) // CacheStore saves expensive calculations from the K/V store type CacheStore struct { // Given the small amount of data these will store, we use the same structure as the memory store - cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes - services map[string]uint64 - operations map[string]map[string]uint64 + cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes + services map[string]uint64 + // This map is for the hierarchy: service name, kind and operation name. + // Each service contains the span kinds, and then operation names belonging to that kind. + // This structure will look like: + /* + "service1":{ + SpanKind.unspecified: { + "operation1": uint64 + } + } + */ + // The uint64 value is the expiry time of operation + operations map[string]map[model.SpanKind]map[string]uint64 - store *badger.DB - ttl time.Duration + ttl time.Duration } // NewCacheStore returns initialized CacheStore for badger use -func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore { +func NewCacheStore(ttl time.Duration) *CacheStore { cs := &CacheStore{ services: make(map[string]uint64), - operations: make(map[string]map[string]uint64), + operations: make(map[string]map[model.SpanKind]map[string]uint64), ttl: ttl, - store: db, } return cs } @@ -48,67 +56,73 @@ func (c *CacheStore) AddService(service string, keyTTL uint64) { } // AddOperation adds the cache with operation names with most updated expiration time -func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) { +func (c *CacheStore) AddOperation(service, operation string, kind model.SpanKind, keyTTL uint64) { c.cacheLock.Lock() defer c.cacheLock.Unlock() if _, found := c.operations[service]; !found { - c.operations[service] = make(map[string]uint64) + c.operations[service] = make(map[model.SpanKind]map[string]uint64) } - if v, found := c.operations[service][operation]; found { + if _, found := c.operations[service][kind]; !found { + c.operations[service][kind] = make(map[string]uint64) + } + if v, found := c.operations[service][kind][operation]; found { if v > keyTTL { return } } - c.operations[service][operation] = keyTTL + c.operations[service][kind][operation] = keyTTL } // Update caches the results of service and service + operation indexes and maintains their TTL -func (c *CacheStore) Update(service, operation string, expireTime uint64) { +func (c *CacheStore) Update(service, operation string, kind model.SpanKind, expireTime uint64) { c.cacheLock.Lock() c.services[service] = expireTime - if _, ok := c.operations[service]; !ok { - c.operations[service] = make(map[string]uint64) + if _, found := c.operations[service]; !found { + c.operations[service] = make(map[model.SpanKind]map[string]uint64) + } + if _, found := c.operations[service][kind]; !found { + c.operations[service][kind] = make(map[string]uint64) } - c.operations[service][operation] = expireTime + c.operations[service][kind][operation] = expireTime c.cacheLock.Unlock() } // GetOperations returns all operations for a specific service & spanKind traced by Jaeger -func (c *CacheStore) GetOperations(service string) ([]spanstore.Operation, error) { - operations := make([]string, 0, len(c.services)) +func (c *CacheStore) GetOperations(service string, kind string) ([]spanstore.Operation, error) { + operations := make([]spanstore.Operation, 0, len(c.services)) //nolint: gosec // G115 - t := uint64(time.Now().Unix()) + currentTime := uint64(time.Now().Unix()) c.cacheLock.Lock() defer c.cacheLock.Unlock() - - if v, ok := c.services[service]; ok { - if v < t { + if expiryTimeOfService, ok := c.services[service]; ok { + if expiryTimeOfService < currentTime { // Expired, remove delete(c.services, service) delete(c.operations, service) return []spanstore.Operation{}, nil // empty slice rather than nil } - for o, e := range c.operations[service] { - if e > t { - operations = append(operations, o) - } else { - delete(c.operations[service], o) + for sKind := range c.operations[service] { + if kind != "" && kind != string(sKind) { + continue + } + for o, expiryTimeOfOperation := range c.operations[service][sKind] { + if expiryTimeOfOperation > currentTime { + op := spanstore.Operation{Name: o, SpanKind: string(sKind)} + operations = append(operations, op) + } else { + delete(c.operations[service][sKind], o) + } + sort.Slice(operations, func(i, j int) bool { + if operations[i].SpanKind == operations[j].SpanKind { + return operations[i].Name < operations[j].Name + } + return operations[i].SpanKind < operations[j].SpanKind + }) } } } - - sort.Strings(operations) - - // TODO: https://github.com/jaegertracing/jaeger/issues/1922 - // - return the operations with actual spanKind - result := make([]spanstore.Operation, 0, len(operations)) - for _, op := range operations { - result = append(result, spanstore.Operation{ - Name: op, - }) - } - return result, nil + return operations, nil } // GetServices returns all services traced by Jaeger diff --git a/plugin/storage/badger/spanstore/cache_test.go b/plugin/storage/badger/spanstore/cache_test.go index 8d3fa151fb0..c6b7ba455f8 100644 --- a/plugin/storage/badger/spanstore/cache_test.go +++ b/plugin/storage/badger/spanstore/cache_test.go @@ -4,28 +4,42 @@ package spanstore import ( + "context" + "fmt" + "sort" "testing" "time" "github.com/dgraph-io/badger/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/jaegertracing/jaeger/model" ) /* Additional cache store tests that need to access internal parts. As such, package must be spanstore and not spanstore_test */ +var spanKinds = []model.SpanKind{ + model.SpanKindUnspecified, + model.SpanKindInternal, + model.SpanKindClient, + model.SpanKindServer, + model.SpanKindProducer, + model.SpanKindConsumer, +} + func TestExpiredItems(t *testing.T) { - runWithBadger(t, func(store *badger.DB, t *testing.T) { - cache := NewCacheStore(store, time.Duration(-1*time.Hour)) + runWithBadger(t, func(_ *badger.DB, t *testing.T) { + cache := NewCacheStore(time.Duration(-1 * time.Hour)) expireTime := uint64(time.Now().Add(cache.ttl).Unix()) // Expired service - cache.Update("service1", "op1", expireTime) - cache.Update("service1", "op2", expireTime) + cache.Update("service1", "op1", model.SpanKindUnspecified, expireTime) + cache.Update("service1", "op2", model.SpanKindUnspecified, expireTime) services, err := cache.GetServices() require.NoError(t, err) @@ -33,26 +47,103 @@ func TestExpiredItems(t *testing.T) { // Expired service for operations - cache.Update("service1", "op1", expireTime) - cache.Update("service1", "op2", expireTime) + cache.Update("service1", "op1", model.SpanKindUnspecified, expireTime) + cache.Update("service1", "op2", model.SpanKindUnspecified, expireTime) - operations, err := cache.GetOperations("service1") + operations, err := cache.GetOperations("service1", "") require.NoError(t, err) assert.Empty(t, operations) // Everything should be expired // Expired operations, stable service - cache.Update("service1", "op1", expireTime) - cache.Update("service1", "op2", expireTime) + cache.Update("service1", "op1", model.SpanKindUnspecified, expireTime) + cache.Update("service1", "op2", model.SpanKindUnspecified, expireTime) cache.services["service1"] = uint64(time.Now().Unix() + 1e10) - operations, err = cache.GetOperations("service1") + operations, err = cache.GetOperations("service1", "") require.NoError(t, err) assert.Empty(t, operations) // Everything should be expired }) } +func TestCacheStore_GetOperations(t *testing.T) { + runWithBadger(t, func(_ *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + expireTime := uint64(time.Now().Add(cache.ttl).Unix()) + serviceName := "service1" + operationName := "op1" + for i := 0; i <= 5; i++ { + cache.Update(serviceName, operationName, spanKinds[i], expireTime) + } + operations, err := cache.GetOperations(serviceName, "") + require.NoError(t, err) + assert.Len(t, operations, 6) + var kinds []string + for i := 0; i <= 5; i++ { + kinds = append(kinds, string(spanKinds[i])) + } + // This is necessary as we want to check whether the result is sorted or not + sort.Strings(kinds) + for i := 0; i <= 5; i++ { + assert.Equal(t, kinds[i], operations[i].SpanKind) + assert.Equal(t, operationName, operations[i].Name) + if i != 0 { + k := kinds[i] + singleKindOperations, err := cache.GetOperations(serviceName, k) + require.NoError(t, err) + assert.Len(t, singleKindOperations, 1) + assert.Equal(t, kinds[i], singleKindOperations[0].SpanKind) + assert.Equal(t, operationName, singleKindOperations[0].Name) + } + } + }) +} + +func TestCacheStore_Update(t *testing.T) { + runWithBadger(t, func(_ *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + expireTime := uint64(time.Now().Add(cache.ttl).Unix()) + serviceName := "service1" + operationName := "op1" + for i := 0; i <= 5; i++ { + cache.Update(serviceName, operationName, spanKinds[i], expireTime) + assert.Equal(t, expireTime, cache.operations[serviceName][spanKinds[i]][operationName]) + } + }) +} + +func TestCacheStore_GetOperationsReturnsEmptyOperationsWithWrongSpanKind(t *testing.T) { + runWithBadger(t, func(_ *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + cache.Update("service", "operation", model.SpanKindUnspecified, uint64(time.Now().Unix())) + operations, err := cache.GetOperations("service1", "a") + require.NoError(t, err) + assert.Empty(t, operations) + }) +} + +func TestCacheStore_GetOperationsSameKind(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + writer := NewSpanWriter(store, cache, 1*time.Hour) + for i := 0; i < 5; i++ { + service := "service" + operation := fmt.Sprintf("op%d", i) + span := createDummySpanWithKind(service, operation, model.SpanKindServer, true) + err := writer.WriteSpan(context.Background(), span) + require.NoError(t, err) + } + operations, err := cache.GetOperations("service", string(model.SpanKindServer)) + require.NoError(t, err) + assert.Len(t, operations, 5) + for i, operation := range operations { + assert.Equal(t, fmt.Sprintf("op%d", i), operation.Name) + assert.Equal(t, string(model.SpanKindServer), operation.SpanKind) + } + }) +} + // func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader)) { func runWithBadger(t *testing.T, test func(store *badger.DB, t *testing.T)) { opts := badger.DefaultOptions("") @@ -71,3 +162,34 @@ func runWithBadger(t *testing.T, test func(store *badger.DB, t *testing.T)) { test(store, t) } + +func createDummySpanWithKind(service string, operation string, kind model.SpanKind, includeSpanKind bool) *model.Span { + var tags model.KeyValues + if includeSpanKind { + tags = model.KeyValues{ + model.KeyValue{ + Key: "span.kind", + VType: model.StringType, + VStr: string(kind), + }, + } + } else { + tags = model.KeyValues{} + } + tid := time.Now() + testSpan := model.Span{ + TraceID: model.TraceID{ + Low: uint64(0), + High: 1, + }, + SpanID: model.SpanID(0), + OperationName: operation, + Process: &model.Process{ + ServiceName: service, + }, + StartTime: tid.Add(1 * time.Millisecond), + Duration: 1 * time.Millisecond, + Tags: tags, + } + return &testSpan +} diff --git a/plugin/storage/badger/spanstore/kind.go b/plugin/storage/badger/spanstore/kind.go new file mode 100644 index 00000000000..153e7f36ca5 --- /dev/null +++ b/plugin/storage/badger/spanstore/kind.go @@ -0,0 +1,65 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package spanstore + +import ( + "github.com/jaegertracing/jaeger/model" +) + +type badgerSpanKind byte + +const ( + badgerSpanKindUnspecified badgerSpanKind = 0x86 + badgerSpanKindInternal badgerSpanKind = 0x87 + badgerSpanKindConsumer badgerSpanKind = 0x88 + badgerSpanKindProducer badgerSpanKind = 0x89 + badgerSpanKindServer badgerSpanKind = 0x90 + badgerSpanKindClient badgerSpanKind = 0x91 +) + +var toSpanKind = map[badgerSpanKind]model.SpanKind{ + badgerSpanKindInternal: model.SpanKindInternal, + badgerSpanKindConsumer: model.SpanKindConsumer, + badgerSpanKindProducer: model.SpanKindProducer, + badgerSpanKindServer: model.SpanKindServer, + badgerSpanKindClient: model.SpanKindClient, + badgerSpanKindUnspecified: model.SpanKindUnspecified, +} + +var fromSpanKind = map[model.SpanKind]badgerSpanKind{ + model.SpanKindInternal: badgerSpanKindInternal, + model.SpanKindConsumer: badgerSpanKindConsumer, + model.SpanKindProducer: badgerSpanKindProducer, + model.SpanKindServer: badgerSpanKindServer, + model.SpanKindClient: badgerSpanKindClient, + model.SpanKindUnspecified: badgerSpanKindUnspecified, +} + +func (b badgerSpanKind) spanKind() model.SpanKind { + if kind, ok := toSpanKind[b]; ok { + return kind + } + return model.SpanKindUnspecified +} + +func (b badgerSpanKind) validateAndGet() badgerSpanKind { + switch b { + case badgerSpanKindInternal, badgerSpanKindConsumer, badgerSpanKindProducer, badgerSpanKindServer, badgerSpanKindClient: + return b + default: + return badgerSpanKindUnspecified + } +} + +func getBadgerSpanKind(kind model.SpanKind) badgerSpanKind { + if bKind, ok := fromSpanKind[kind]; ok { + return bKind + } + return badgerSpanKindUnspecified +} + +func getBadgerSpanKindFromByte(kind byte) badgerSpanKind { + badgerKind := badgerSpanKind(kind) + return badgerKind.validateAndGet() +} diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 00c1fa4dfc9..6a132520ff9 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -55,8 +55,9 @@ const ( // TraceReader reads traces from the local badger store type TraceReader struct { - store *badger.DB - cache *CacheStore + store *badger.DB + cache *CacheStore + dualLookUp bool } // executionPlan is internal structure to track the index filtering @@ -74,15 +75,19 @@ type executionPlan struct { } // NewTraceReader returns a TraceReader with cache -func NewTraceReader(db *badger.DB, c *CacheStore, prefillCache bool) *TraceReader { +func NewTraceReader(db *badger.DB, c *CacheStore, prefillCache bool, dualLookUp bool) *TraceReader { reader := &TraceReader{ - store: db, - cache: c, + store: db, + cache: c, + dualLookUp: dualLookUp, } if prefillCache { services := reader.preloadServices() for _, service := range services { - reader.preloadOperations(service) + if reader.dualLookUp { + reader.preloadOperations(service) + } + reader.preloadOperationsWthKind(service) } } return reader @@ -246,7 +251,7 @@ func (r *TraceReader) GetOperations( _ context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - return r.cache.GetOperations(query.ServiceName) + return r.cache.GetOperations(query.ServiceName, query.SpanKind) } // setQueryDefaults alters the query with defaults if certain parameters are not set @@ -257,7 +262,7 @@ func setQueryDefaults(query *spanstore.TraceQueryParameters) { } // serviceQueries parses the query to index seeks which are unique index seeks -func serviceQueries(query *spanstore.TraceQueryParameters, indexSeeks [][]byte) [][]byte { +func serviceQueries(query *spanstore.TraceQueryParameters, indexSeeks [][]byte, operationPrefix byte) [][]byte { if query.ServiceName != "" { indexSearchKey := make([]byte, 0, 64) // 64 is a magic guess tagQueryUsed := false @@ -271,13 +276,13 @@ func serviceQueries(query *spanstore.TraceQueryParameters, indexSeeks [][]byte) } if query.OperationName != "" { - indexSearchKey = append(indexSearchKey, operationNameIndexKey) - indexSearchKey = append(indexSearchKey, []byte(query.ServiceName+query.OperationName)...) + prefix := []byte(query.ServiceName + query.OperationName) + indexSearchKey = append(indexSearchKey, operationPrefix) + indexSearchKey = append(indexSearchKey, prefix...) } else if !tagQueryUsed { // Tag query already reduces the search set with a serviceName indexSearchKey = append(indexSearchKey, serviceNameIndexKey) indexSearchKey = append(indexSearchKey, []byte(query.ServiceName)...) } - if len(indexSearchKey) > 0 { indexSeeks = append(indexSeeks, indexSearchKey) } @@ -369,7 +374,6 @@ func buildHash(plan *executionPlan, outerIDs [][]byte) map[model.TraceID]struct{ hashed := make(map[model.TraceID]struct{}) for i := 0; i < len(outerIDs); i++ { trID := bytesToTraceID(outerIDs[i]) - if plan.hashOuter != nil { if _, exists := plan.hashOuter[trID]; exists { hashed[trID] = empty @@ -379,7 +383,6 @@ func buildHash(plan *executionPlan, outerIDs [][]byte) map[model.TraceID]struct{ hashed[trID] = empty } } - return hashed } @@ -470,7 +473,7 @@ func (r *TraceReader) FindTraceIDs(_ context.Context, query *spanstore.TraceQuer // Find matches using indexes that are using service as part of the key indexSeeks := make([][]byte, 0, 1) - indexSeeks = serviceQueries(query, indexSeeks) + indexSeeks = serviceQueries(query, indexSeeks, operationNameWithKindIndexKey) startStampBytes := make([]byte, 8) binary.BigEndian.PutUint64(startStampBytes, model.TimeAsEpochMicroseconds(query.StartTimeMin)) @@ -483,23 +486,66 @@ func (r *TraceReader) FindTraceIDs(_ context.Context, query *spanstore.TraceQuer startTimeMax: endStampBytes, limit: query.NumTraces, } - + var hashOuter map[model.TraceID]struct{} if query.DurationMax != 0 || query.DurationMin != 0 { - plan.hashOuter = r.durationQueries(plan, query) + hashOuter = r.durationQueries(plan, query) + plan.hashOuter = hashOuter } + var keys []model.TraceID + var err error if len(indexSeeks) > 0 { - keys, err := r.indexSeeksToTraceIDs(plan, indexSeeks) + keys, err = r.indexSeeksToTraceIDs(plan, indexSeeks) if err != nil { return nil, err } - + } + if r.dualLookUp && len(keys) < query.NumTraces { + oldIndexPresent, err := r.oldOperationIndexExists() + if err != nil { + return nil, err + } + if oldIndexPresent { + oldIndexSeeks := make([][]byte, 0, 1) + oldIndexSeeks = serviceQueries(query, oldIndexSeeks, operationNameIndexKey) + if len(oldIndexSeeks) > 0 { + // This new plan is necessary as merge outer and hash outers should be unique + oldIndexPlan := &executionPlan{ + startTimeMin: startStampBytes, + startTimeMax: endStampBytes, + limit: query.NumTraces - len(keys), + } + oldIndexPlan.hashOuter = hashOuter + oldKeys, err := r.indexSeeksToTraceIDs(oldIndexPlan, oldIndexSeeks) + if err != nil { + return nil, err + } + keys = append(keys, oldKeys...) + } + } + } + if len(keys) > 0 { return keys, nil } - return r.scanTimeRange(plan) } +func (r *TraceReader) oldOperationIndexExists() (bool, error) { + found := false + err := r.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + iter := txn.NewIterator(opts) + defer iter.Close() + for iter.Seek([]byte{operationNameIndexKey}); iter.ValidForPrefix([]byte{operationNameIndexKey}); iter.Next() { + found = true + return nil + } + return nil + }) + return found, err +} + // validateQuery returns an error if certain restrictions are not met func validateQuery(p *spanstore.TraceQueryParameters) error { if p == nil { @@ -527,6 +573,8 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, plan *executionPlan) ([][]byte, error) { indexResults := make([][]byte, 0) + endingIndexOfTimeStampFromLast := getEndingIndexOfTimeStampFromLast(indexKeyValue) + err := r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = false // Don't fetch values since we're only interested in the keys @@ -542,13 +590,11 @@ func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, plan *executionPlan) ( for it.Seek(startIndex); scanFunction(it, indexKeyValue, plan.startTimeMin); it.Next() { item := it.Item() - // ScanFunction is a prefix scanning (since we could have for example service1 & service12) // Now we need to match only the exact key if we want to add it - timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // timestamp is stored with 8 bytes + timestampStartIndex := len(it.Item().Key()) - (endingIndexOfTimeStampFromLast + 8) // timestamp is stored with 8 bytes if bytes.Equal(indexKeyValue, it.Item().Key()[:timestampStartIndex]) { traceIDBytes := item.Key()[len(item.Key())-sizeOfTraceID:] - traceIDCopy := make([]byte, sizeOfTraceID) copy(traceIDCopy, traceIDBytes) indexResults = append(indexResults, traceIDCopy) @@ -556,20 +602,26 @@ func (r *TraceReader) scanIndexKeys(indexKeyValue []byte, plan *executionPlan) ( } return nil }) - return indexResults, err } +func getEndingIndexOfTimeStampFromLast(key []byte) int { + if key[0] == operationNameWithKindIndexKey { + return sizeOfTraceID + 1 + } + return sizeOfTraceID +} + // scanFunction compares the index name as well as the time range in the index key func scanFunction(it *badger.Iterator, indexPrefix []byte, timeBytesEnd []byte) bool { if it.Valid() { + startIndexOfTraceId := getEndingIndexOfTimeStampFromLast(it.Item().Key()) // We can't use the indexPrefix length, because we might have the same prefixValue for non-matching cases also - timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // timestamp is stored with 8 bytes + timestampStartIndex := len(it.Item().Key()) - (startIndexOfTraceId + 8) // timestamp is stored with 8 bytes timestamp := it.Item().Key()[timestampStartIndex : timestampStartIndex+8] timestampInRange := bytes.Compare(timeBytesEnd, timestamp) <= 0 - // Check length as well to prevent theoretical case where timestamp might match with wrong index key - if len(it.Item().Key()) != len(indexPrefix)+24 { + if len(it.Item().Key()) != len(indexPrefix)+24+getExtraLengthOfKey(it.Item().Key()) { return false } @@ -578,6 +630,13 @@ func scanFunction(it *badger.Iterator, indexPrefix []byte, timeBytesEnd []byte) return false } +func getExtraLengthOfKey(key []byte) int { + if key[0] == operationNameWithKindIndexKey { + return 1 + } + return 0 +} + // scanRangeIndex scans the time range for index keys matching the given prefix. func (r *TraceReader) scanRangeIndex(plan *executionPlan, indexStartValue []byte, indexEndValue []byte) ([][]byte, error) { indexResults := make([][]byte, 0) @@ -647,6 +706,7 @@ func (r *TraceReader) preloadServices() []string { func (r *TraceReader) preloadOperations(service string) { r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false it := txn.NewIterator(opts) defer it.Close() @@ -659,7 +719,29 @@ func (r *TraceReader) preloadOperations(service string) { timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64) operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) keyTTL := it.Item().ExpiresAt() - r.cache.AddOperation(service, operationName, keyTTL) + kind := model.SpanKindUnspecified + r.cache.AddOperation(service, operationName, kind, keyTTL) + } + return nil + }) +} + +func (r *TraceReader) preloadOperationsWthKind(service string) { + r.store.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + serviceKey := make([]byte, len(service)+1) + serviceKey[0] = operationNameWithKindIndexKey + copy(serviceKey[1:], service) + for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() { + timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8 + 1) + operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex]) + keyTTL := it.Item().ExpiresAt() + badgerKind := getBadgerSpanKindFromByte(it.Item().Key()[timestampStartIndex+8]) + kind := badgerKind.spanKind() + r.cache.AddOperation(service, operationName, kind, keyTTL) } return nil }) diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index ced19506b90..e2bef8e3a2b 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -6,6 +6,7 @@ package spanstore import ( "context" "encoding/binary" + "fmt" "math/rand" "testing" "time" @@ -23,9 +24,9 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour)) + cache := NewCacheStore(time.Duration(1 * time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache, true) + rw := NewTraceReader(store, cache, true, true) sw.encodingType = jsonEncoding err := sw.WriteSpan(context.Background(), &testSpan) @@ -40,7 +41,7 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour)) + cache := NewCacheStore(time.Duration(1 * time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) // rw := NewTraceReader(store, cache) @@ -53,9 +54,9 @@ func TestEncodingTypes(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour)) + cache := NewCacheStore(time.Duration(1 * time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache, true) + rw := NewTraceReader(store, cache, true, true) err := sw.WriteSpan(context.Background(), &testSpan) require.NoError(t, err) @@ -92,9 +93,9 @@ func TestDecodeErrorReturns(t *testing.T) { func TestDuplicateTraceIDDetection(t *testing.T) { runWithBadger(t, func(store *badger.DB, t *testing.T) { testSpan := createDummySpan() - cache := NewCacheStore(store, time.Duration(1*time.Hour)) + cache := NewCacheStore(time.Duration(1 * time.Hour)) sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour)) - rw := NewTraceReader(store, cache, true) + rw := NewTraceReader(store, cache, true, true) origStartTime := testSpan.StartTime traceCount := 128 @@ -212,20 +213,106 @@ func TestOldReads(t *testing.T) { }) } - cache := NewCacheStore(store, time.Duration(-1*time.Hour)) + cache := NewCacheStore(time.Duration(-1 * time.Hour)) writer() nuTid := tid.Add(1 * time.Hour) - cache.Update("service1", "operation1", uint64(tid.Unix())) + cache.Update("service1", "operation1", model.SpanKindUnspecified, uint64(tid.Unix())) cache.services["service1"] = uint64(nuTid.Unix()) - cache.operations["service1"]["operation1"] = uint64(nuTid.Unix()) + cache.operations["service1"][model.SpanKindUnspecified]["operation1"] = uint64(nuTid.Unix()) // This is equivalent to populate caches of cache - _ = NewTraceReader(store, cache, true) + _ = NewTraceReader(store, cache, true, true) // Now make sure we didn't use the older timestamps from the DB assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"]) - assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"]) + assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"][model.SpanKindUnspecified]["operation1"]) + }) +} + +// Code Coverage Test +func TestCacheStore_WrongSpanKindFromBadger(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + writer := NewSpanWriter(store, cache, 1*time.Hour) + _ = NewTraceReader(store, cache, true, true) + span := createDummySpanWithKind("service", "operation", model.SpanKind("New Kind"), true) + err := writer.WriteSpan(context.Background(), span) + require.NoError(t, err) + newCache := NewCacheStore(1 * time.Hour) + _ = NewTraceReader(store, newCache, true, true) + services, err := newCache.GetServices() + require.NoError(t, err) + assert.Len(t, services, 1) + assert.Equal(t, "service", services[0]) + operations, err := newCache.GetOperations("service", "") + require.NoError(t, err) + assert.Len(t, operations, 1) + assert.Equal(t, "", operations[0].SpanKind) + assert.Equal(t, "operation", operations[0].Name) + }) +} + +// Test Case for old data +func TestCacheStore_WhenValueIsNil(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + w := NewSpanWriter(store, cache, 1*time.Hour) + _ = NewTraceReader(store, cache, true, true) + var entriesToStore []*badger.Entry + timeNow := model.TimeAsEpochMicroseconds(time.Now()) + expireTime := uint64(time.Now().Add(cache.ttl).Unix()) + entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(serviceNameIndexKey, []byte("service"), timeNow, model.TraceID{High: 0, Low: 0}), nil, expireTime)) + entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(operationNameIndexKey, []byte("serviceoperation"), timeNow, model.TraceID{High: 0, Low: 0}), nil, expireTime)) + err := store.Update(func(txn *badger.Txn) error { + err := txn.SetEntry(entriesToStore[0]) + require.NoError(t, err) + err = txn.SetEntry(entriesToStore[1]) + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + newCache := NewCacheStore(1 * time.Hour) + _ = NewTraceReader(store, newCache, true, true) + services, err := newCache.GetServices() + require.NoError(t, err) + assert.Len(t, services, 1) + assert.Equal(t, "service", services[0]) + operations, err := newCache.GetOperations("service", "") + require.NoError(t, err) + assert.Len(t, operations, 1) + assert.Equal(t, "", operations[0].SpanKind) + assert.Equal(t, "operation", operations[0].Name) + }) +} + +func TestCacheStore_Prefill(t *testing.T) { + runWithBadger(t, func(store *badger.DB, t *testing.T) { + cache := NewCacheStore(1 * time.Hour) + writer := NewSpanWriter(store, cache, 1*time.Hour) + var spans []*model.Span + // Write a span without kind also + spanWithoutKind := createDummySpanWithKind("service0", "op0", model.SpanKindUnspecified, false) + spans = append(spans, spanWithoutKind) + err := writer.WriteSpan(context.Background(), spanWithoutKind) + require.NoError(t, err) + for i := 1; i < 6; i++ { + service := fmt.Sprintf("service%d", i) + operation := fmt.Sprintf("op%d", i) + span := createDummySpanWithKind(service, operation, spanKinds[i], true) + spans = append(spans, span) + err = writer.WriteSpan(context.Background(), span) + require.NoError(t, err) + } + // Create a new cache for testing prefill as old span will consist the data from update called from WriteSpan + newCache := NewCacheStore(1 * time.Hour) + _ = NewTraceReader(store, newCache, true, true) + for i, span := range spans { + _, foundService := newCache.services[span.Process.ServiceName] + assert.True(t, foundService) + _, foundOperation := newCache.operations[span.Process.ServiceName][spanKinds[i]][span.OperationName] + assert.True(t, foundOperation) + } }) } diff --git a/plugin/storage/badger/spanstore/writer.go b/plugin/storage/badger/spanstore/writer.go index 54680438cb8..2fb5180ab75 100644 --- a/plugin/storage/badger/spanstore/writer.go +++ b/plugin/storage/badger/spanstore/writer.go @@ -24,15 +24,16 @@ import ( */ const ( - spanKeyPrefix byte = 0x80 // All span keys should have first bit set to 1 - indexKeyRange byte = 0x0F // Secondary indexes use last 4 bits - serviceNameIndexKey byte = 0x81 - operationNameIndexKey byte = 0x82 - tagIndexKey byte = 0x83 - durationIndexKey byte = 0x84 - jsonEncoding byte = 0x01 // Last 4 bits of the meta byte are for encoding type - protoEncoding byte = 0x02 // Last 4 bits of the meta byte are for encoding type - defaultEncoding byte = protoEncoding + spanKeyPrefix byte = 0x80 // All span keys should have first bit set to 1 + indexKeyRange byte = 0x0F // Secondary indexes use last 4 bits + serviceNameIndexKey byte = 0x81 + operationNameIndexKey byte = 0x82 + tagIndexKey byte = 0x83 + durationIndexKey byte = 0x84 + operationNameWithKindIndexKey byte = 0x85 + jsonEncoding byte = 0x01 // Last 4 bits of the meta byte are for encoding type + protoEncoding byte = 0x02 // Last 4 bits of the meta byte are for encoding type + defaultEncoding byte = protoEncoding ) // SpanWriter for writing spans to badger @@ -55,6 +56,11 @@ func NewSpanWriter(db *badger.DB, c *CacheStore, ttl time.Duration) *SpanWriter // WriteSpan writes the encoded span as well as creates indexes with defined TTL func (w *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { + return w.writeSpan(span, false) +} + +// This method is to test backward compatibility for old index key +func (w *SpanWriter) writeSpan(span *model.Span, writeOldIndex bool) error { //nolint: gosec // G115 expireTime := uint64(time.Now().Add(w.ttl).Unix()) startTime := model.TimeAsEpochMicroseconds(span.StartTime) @@ -67,9 +73,32 @@ func (w *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { return err } + kind, _ := span.GetSpanKind() + entriesToStore = append(entriesToStore, trace) entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(serviceNameIndexKey, []byte(span.Process.ServiceName), startTime, span.TraceID), nil, expireTime)) - entriesToStore = append(entriesToStore, w.createBadgerEntry(createIndexKey(operationNameIndexKey, []byte(span.Process.ServiceName+span.OperationName), startTime, span.TraceID), nil, expireTime)) + if writeOldIndex { + entriesToStore = append( + entriesToStore, + w.createBadgerEntry( + createIndexKey( + operationNameIndexKey, + []byte(span.Process.ServiceName+span.OperationName), + startTime, + span.TraceID, + ), + nil, + expireTime), + ) + } else { + entriesToStore = append( + entriesToStore, + w.createBadgerEntry( + createOperationWithKindIndexKey(span, kind, startTime), + nil, + expireTime), + ) + } // It doesn't matter if we overwrite Duration index keys, everything is read at Trace level in any case durationValue := make([]byte, 8) @@ -109,7 +138,7 @@ func (w *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { }) // Do cache refresh here to release the transaction earlier - w.cache.Update(span.Process.ServiceName, span.OperationName, expireTime) + w.cache.Update(span.Process.ServiceName, span.OperationName, kind, expireTime) return err } @@ -128,6 +157,24 @@ func createIndexKey(indexPrefixKey byte, value []byte, startTime uint64, traceID return key } +func createOperationWithKindIndexKey(span *model.Span, kind model.SpanKind, startTime uint64) []byte { + // KEY: indexKey (traceId is last 16 bytes of the key) + value := []byte(span.Process.ServiceName + span.OperationName) + traceID := span.TraceID + key := make([]byte, 1+len(value)+8+1+sizeOfTraceID) + key[0] = operationNameWithKindIndexKey + pos := len(value) + 1 + copy(key[1:pos], value) + binary.BigEndian.PutUint64(key[pos:], startTime) + pos += 8 + key[pos] = byte(getBadgerSpanKind(kind)) + pos++ + binary.BigEndian.PutUint64(key[pos:], traceID.High) + pos += 8 // sizeOfTraceID / 2 + binary.BigEndian.PutUint64(key[pos:], traceID.Low) + return key +} + func (*SpanWriter) createBadgerEntry(key []byte, value []byte, expireTime uint64) *badger.Entry { return &badger.Entry{ Key: key, diff --git a/plugin/storage/integration/badgerstore_test.go b/plugin/storage/integration/badgerstore_test.go index 3a9f7c2f436..04bc7700d24 100644 --- a/plugin/storage/integration/badgerstore_test.go +++ b/plugin/storage/integration/badgerstore_test.go @@ -57,9 +57,6 @@ func TestBadgerStorage(t *testing.T) { s := &BadgerIntegrationStorage{ StorageIntegration: StorageIntegration{ SkipArchiveTest: true, - - // TODO: remove this badger supports returning spanKind from GetOperations - GetOperationsMissingSpanKind: true, }, } s.CleanUp = s.cleanUp