Skip to content

Commit caccdce

Browse files
authored
[refactor][storage][badger]Refactored the prefilling of cache to reader (jaegertracing#6575)
## Which problem is this PR solving? Comment: jaegertracing#6376 (comment) ## Description of the changes - Cache was directly contacting the db to prefill itself which is not a good way, now this responsibility is given to reader to read from badger and fill the cache. ## How was this change tested? - Unit and e2e tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Manik2708 <[email protected]>
1 parent f6c4be1 commit caccdce

File tree

5 files changed

+124
-115
lines changed

5 files changed

+124
-115
lines changed

plugin/storage/badger/factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
150150
}
151151
f.store = store
152152

153-
f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans, true)
153+
f.cache = badgerStore.NewCacheStore(f.store, f.Config.TTL.Spans)
154154

155155
f.metrics.ValueLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: valueLogSpaceAvailableName})
156156
f.metrics.KeyLogSpaceAvailable = metricsFactory.Gauge(metrics.Options{Name: keyLogSpaceAvailableName})
@@ -176,7 +176,7 @@ func initializeDir(path string) {
176176

177177
// CreateSpanReader implements storage.Factory
178178
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
179-
tr := badgerStore.NewTraceReader(f.store, f.cache)
179+
tr := badgerStore.NewTraceReader(f.store, f.cache, true)
180180
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
181181
}
182182

plugin/storage/badger/spanstore/cache.go

+20-62
Original file line numberDiff line numberDiff line change
@@ -25,83 +25,41 @@ type CacheStore struct {
2525
}
2626

2727
// NewCacheStore returns initialized CacheStore for badger use
28-
func NewCacheStore(db *badger.DB, ttl time.Duration, prefill bool) *CacheStore {
28+
func NewCacheStore(db *badger.DB, ttl time.Duration) *CacheStore {
2929
cs := &CacheStore{
3030
services: make(map[string]uint64),
3131
operations: make(map[string]map[string]uint64),
3232
ttl: ttl,
3333
store: db,
3434
}
35-
36-
if prefill {
37-
cs.populateCaches()
38-
}
3935
return cs
4036
}
4137

42-
func (c *CacheStore) populateCaches() {
38+
// AddService fills the services into the cache with the most updated expiration time
39+
func (c *CacheStore) AddService(service string, keyTTL uint64) {
4340
c.cacheLock.Lock()
4441
defer c.cacheLock.Unlock()
45-
46-
c.loadServices()
47-
48-
for k := range c.services {
49-
c.loadOperations(k)
50-
}
51-
}
52-
53-
func (c *CacheStore) loadServices() {
54-
c.store.View(func(txn *badger.Txn) error {
55-
opts := badger.DefaultIteratorOptions
56-
it := txn.NewIterator(opts)
57-
defer it.Close()
58-
59-
serviceKey := []byte{serviceNameIndexKey}
60-
61-
// Seek all the services first
62-
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
63-
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
64-
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
65-
keyTTL := it.Item().ExpiresAt()
66-
if v, found := c.services[serviceName]; found {
67-
if v > keyTTL {
68-
continue
69-
}
70-
}
71-
c.services[serviceName] = keyTTL
42+
if v, found := c.services[service]; found {
43+
if v > keyTTL {
44+
return
7245
}
73-
return nil
74-
})
46+
}
47+
c.services[service] = keyTTL
7548
}
7649

77-
func (c *CacheStore) loadOperations(service string) {
78-
c.store.View(func(txn *badger.Txn) error {
79-
opts := badger.DefaultIteratorOptions
80-
it := txn.NewIterator(opts)
81-
defer it.Close()
82-
83-
serviceKey := make([]byte, len(service)+1)
84-
serviceKey[0] = operationNameIndexKey
85-
copy(serviceKey[1:], service)
86-
87-
// Seek all the services first
88-
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
89-
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
90-
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
91-
keyTTL := it.Item().ExpiresAt()
92-
if _, found := c.operations[service]; !found {
93-
c.operations[service] = make(map[string]uint64)
94-
}
95-
96-
if v, found := c.operations[service][operationName]; found {
97-
if v > keyTTL {
98-
continue
99-
}
100-
}
101-
c.operations[service][operationName] = keyTTL
50+
// AddOperation adds the cache with operation names with most updated expiration time
51+
func (c *CacheStore) AddOperation(service, operation string, keyTTL uint64) {
52+
c.cacheLock.Lock()
53+
defer c.cacheLock.Unlock()
54+
if _, found := c.operations[service]; !found {
55+
c.operations[service] = make(map[string]uint64)
56+
}
57+
if v, found := c.operations[service][operation]; found {
58+
if v > keyTTL {
59+
return
10260
}
103-
return nil
104-
})
61+
}
62+
c.operations[service][operation] = keyTTL
10563
}
10664

10765
// Update caches the results of service and service + operation indexes and maintains their TTL

plugin/storage/badger/spanstore/cache_test.go

+1-42
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"github.com/dgraph-io/badger/v4"
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
13-
14-
"github.com/jaegertracing/jaeger/model"
1513
)
1614

1715
/*
@@ -20,7 +18,7 @@ import (
2018

2119
func TestExpiredItems(t *testing.T) {
2220
runWithBadger(t, func(store *badger.DB, t *testing.T) {
23-
cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)
21+
cache := NewCacheStore(store, time.Duration(-1*time.Hour))
2422

2523
expireTime := uint64(time.Now().Add(cache.ttl).Unix())
2624

@@ -55,45 +53,6 @@ func TestExpiredItems(t *testing.T) {
5553
})
5654
}
5755

58-
func TestOldReads(t *testing.T) {
59-
runWithBadger(t, func(store *badger.DB, t *testing.T) {
60-
timeNow := model.TimeAsEpochMicroseconds(time.Now())
61-
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
62-
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})
63-
64-
tid := time.Now().Add(1 * time.Minute)
65-
66-
writer := func() {
67-
store.Update(func(txn *badger.Txn) error {
68-
txn.SetEntry(&badger.Entry{
69-
Key: s1Key,
70-
ExpiresAt: uint64(tid.Unix()),
71-
})
72-
txn.SetEntry(&badger.Entry{
73-
Key: s1o1Key,
74-
ExpiresAt: uint64(tid.Unix()),
75-
})
76-
return nil
77-
})
78-
}
79-
80-
cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)
81-
writer()
82-
83-
nuTid := tid.Add(1 * time.Hour)
84-
85-
cache.Update("service1", "operation1", uint64(tid.Unix()))
86-
cache.services["service1"] = uint64(nuTid.Unix())
87-
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())
88-
89-
cache.populateCaches()
90-
91-
// Now make sure we didn't use the older timestamps from the DB
92-
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
93-
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
94-
})
95-
}
96-
9756
// func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader)) {
9857
func runWithBadger(t *testing.T, test func(store *badger.DB, t *testing.T)) {
9958
opts := badger.DefaultOptions("")

plugin/storage/badger/spanstore/reader.go

+54-2
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,18 @@ type executionPlan struct {
7474
}
7575

7676
// NewTraceReader returns a TraceReader with cache
77-
func NewTraceReader(db *badger.DB, c *CacheStore) *TraceReader {
78-
return &TraceReader{
77+
func NewTraceReader(db *badger.DB, c *CacheStore, prefillCache bool) *TraceReader {
78+
reader := &TraceReader{
7979
store: db,
8080
cache: c,
8181
}
82+
if prefillCache {
83+
services := reader.preloadServices()
84+
for _, service := range services {
85+
reader.preloadOperations(service)
86+
}
87+
}
88+
return reader
8289
}
8390

8491
func decodeValue(val []byte, encodeType byte) (*model.Span, error) {
@@ -612,3 +619,48 @@ func scanRangeFunction(it *badger.Iterator, indexEndValue []byte) bool {
612619
}
613620
return false
614621
}
622+
623+
// preloadServices fills the cache with services after extracting from badger
624+
func (r *TraceReader) preloadServices() []string {
625+
var services []string
626+
r.store.View(func(txn *badger.Txn) error {
627+
opts := badger.DefaultIteratorOptions
628+
it := txn.NewIterator(opts)
629+
defer it.Close()
630+
631+
serviceKey := []byte{serviceNameIndexKey}
632+
633+
// Seek all the services first
634+
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
635+
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
636+
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
637+
keyTTL := it.Item().ExpiresAt()
638+
services = append(services, serviceName)
639+
r.cache.AddService(serviceName, keyTTL)
640+
}
641+
return nil
642+
})
643+
return services
644+
}
645+
646+
// preloadOperations extract all operations for a specified service
647+
func (r *TraceReader) preloadOperations(service string) {
648+
r.store.View(func(txn *badger.Txn) error {
649+
opts := badger.DefaultIteratorOptions
650+
it := txn.NewIterator(opts)
651+
defer it.Close()
652+
653+
serviceKey := make([]byte, len(service)+1)
654+
serviceKey[0] = operationNameIndexKey
655+
copy(serviceKey[1:], service)
656+
657+
// Seek all the services first
658+
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
659+
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
660+
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
661+
keyTTL := it.Item().ExpiresAt()
662+
r.cache.AddOperation(service, operationName, keyTTL)
663+
}
664+
return nil
665+
})
666+
}

plugin/storage/badger/spanstore/rw_internal_test.go

+47-7
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ func TestEncodingTypes(t *testing.T) {
2323
runWithBadger(t, func(store *badger.DB, t *testing.T) {
2424
testSpan := createDummySpan()
2525

26-
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
26+
cache := NewCacheStore(store, time.Duration(1*time.Hour))
2727
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
28-
rw := NewTraceReader(store, cache)
28+
rw := NewTraceReader(store, cache, true)
2929

3030
sw.encodingType = jsonEncoding
3131
err := sw.WriteSpan(context.Background(), &testSpan)
@@ -40,7 +40,7 @@ func TestEncodingTypes(t *testing.T) {
4040
runWithBadger(t, func(store *badger.DB, t *testing.T) {
4141
testSpan := createDummySpan()
4242

43-
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
43+
cache := NewCacheStore(store, time.Duration(1*time.Hour))
4444
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
4545
// rw := NewTraceReader(store, cache)
4646

@@ -53,9 +53,9 @@ func TestEncodingTypes(t *testing.T) {
5353
runWithBadger(t, func(store *badger.DB, t *testing.T) {
5454
testSpan := createDummySpan()
5555

56-
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
56+
cache := NewCacheStore(store, time.Duration(1*time.Hour))
5757
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
58-
rw := NewTraceReader(store, cache)
58+
rw := NewTraceReader(store, cache, true)
5959

6060
err := sw.WriteSpan(context.Background(), &testSpan)
6161
require.NoError(t, err)
@@ -92,9 +92,9 @@ func TestDecodeErrorReturns(t *testing.T) {
9292
func TestDuplicateTraceIDDetection(t *testing.T) {
9393
runWithBadger(t, func(store *badger.DB, t *testing.T) {
9494
testSpan := createDummySpan()
95-
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
95+
cache := NewCacheStore(store, time.Duration(1*time.Hour))
9696
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour))
97-
rw := NewTraceReader(store, cache)
97+
rw := NewTraceReader(store, cache, true)
9898
origStartTime := testSpan.StartTime
9999

100100
traceCount := 128
@@ -189,3 +189,43 @@ func TestMergeJoin(t *testing.T) {
189189
chk.Len(merged, 2)
190190
chk.Equal(uint32(2), binary.BigEndian.Uint32(merged[1]))
191191
}
192+
193+
func TestOldReads(t *testing.T) {
194+
runWithBadger(t, func(store *badger.DB, t *testing.T) {
195+
timeNow := model.TimeAsEpochMicroseconds(time.Now())
196+
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
197+
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})
198+
199+
tid := time.Now().Add(1 * time.Minute)
200+
201+
writer := func() {
202+
store.Update(func(txn *badger.Txn) error {
203+
txn.SetEntry(&badger.Entry{
204+
Key: s1Key,
205+
ExpiresAt: uint64(tid.Unix()),
206+
})
207+
txn.SetEntry(&badger.Entry{
208+
Key: s1o1Key,
209+
ExpiresAt: uint64(tid.Unix()),
210+
})
211+
return nil
212+
})
213+
}
214+
215+
cache := NewCacheStore(store, time.Duration(-1*time.Hour))
216+
writer()
217+
218+
nuTid := tid.Add(1 * time.Hour)
219+
220+
cache.Update("service1", "operation1", uint64(tid.Unix()))
221+
cache.services["service1"] = uint64(nuTid.Unix())
222+
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())
223+
224+
// This is equivalent to populate caches of cache
225+
_ = NewTraceReader(store, cache, true)
226+
227+
// Now make sure we didn't use the older timestamps from the DB
228+
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
229+
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
230+
})
231+
}

0 commit comments

Comments
 (0)