diff --git a/posting/list_test.go b/posting/list_test.go index 3fa4b506778..340ea617b26 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -519,6 +519,7 @@ func TestReadSingleValue(t *testing.T) { Value: []byte(fmt.Sprintf("ho hey there%d", i)), } txn := Txn{StartTs: i} + ol.mutationMap.setTs(i) addMutationHelper(t, ol, edge, Set, &txn) require.NoError(t, ol.commitMutation(i, i+1)) kData := ol.getMutation(i + 1) diff --git a/posting/lists.go b/posting/lists.go index a43e2b82092..e7172ed4a78 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -46,12 +46,6 @@ func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) { closer = z.NewCloser(1) go x.MonitorMemoryMetrics(closer) - // Initialize cache. - if cacheSize == 0 { - memoryLayer = &MemoryLayer{} - return - } - memoryLayer = initMemoryLayer(cacheSize, keepUpdates) } diff --git a/posting/mvcc.go b/posting/mvcc.go index 1b32ac86c38..f7682eb56a9 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -21,7 +21,6 @@ import ( "context" "encoding/hex" "math" - "runtime" "strconv" "sync" "sync/atomic" @@ -88,7 +87,6 @@ var ( var memoryLayer *MemoryLayer func init() { - runtime.SetCPUProfileRate(200) x.AssertTrue(len(IncrRollup.priorityKeys) == 2) for i := range IncrRollup.priorityKeys { IncrRollup.priorityKeys[i] = &pooledKeys{ @@ -348,82 +346,105 @@ func RemoveCacheFor(key []byte) { memoryLayer.del(key) } -type MemoryLayer struct { - cache *ristretto.Cache[[]byte, *CachePL] - - keepUpdates bool +type Cache struct { + data *ristretto.Cache[[]byte, *CachePL] numCacheRead int numCacheReadFails int - numDisksRead int numCacheSave int } -func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer { - ml := &MemoryLayer{} - ml.keepUpdates = keepUpdates - cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{ - // Use 5% of cache memory for storing counters. - NumCounters: int64(float64(cacheSize) * 0.05 * 2), - MaxCost: int64(float64(cacheSize) * 0.95), - BufferItems: 16, - Metrics: true, - Cost: func(val *CachePL) int64 { - return 1 - }, - ShouldUpdate: func(cur, prev *CachePL) bool { - return !(cur.list != nil && prev.list != nil && prev.list.maxTs > cur.list.maxTs) - }, - }) - x.Check(err) - go func() { - m := cache.Metrics - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for range ticker.C { - // Record the posting list cache hit ratio - ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) - } - }() - if cacheSize > 0 { - ml.cache = cache +func (c *Cache) wait() { + if c == nil { + return } - return ml + c.data.Wait() } -func (ml *MemoryLayer) get(key []byte) (*CachePL, bool) { - if ml.cache == nil { +func (c *Cache) get(key []byte) (*CachePL, bool) { + if c == nil { return nil, false } - val, ok := ml.cache.Get(key) + val, ok := c.data.Get(key) if !ok { + c.numCacheReadFails += 1 return val, ok } if val.list == nil { + c.numCacheReadFails += 1 return nil, false } + c.numCacheRead += 1 return val, true } -func (ml *MemoryLayer) set(key []byte, i *CachePL) { - if ml.cache == nil { +func (c *Cache) set(key []byte, i *CachePL) { + if c == nil { return } - ml.cache.Set(key, i, 1) + c.numCacheSave += 1 + c.data.Set(key, i, 1) } -func (ml *MemoryLayer) del(key []byte) { - if ml.cache == nil { +func (c *Cache) del(key []byte) { + if c == nil { return } - ml.cache.Del(key) + c.data.Del(key) } -func (ml *MemoryLayer) clear() { - if ml.cache == nil { +func (c *Cache) clear() { + if c == nil { return } - ml.cache.Clear() + c.data.Clear() +} + +type MemoryLayer struct { + keepUpdates bool + cache *Cache + + numDisksRead int +} + +func (ml *MemoryLayer) clear() { + ml.cache.clear() +} +func (ml *MemoryLayer) del(key []byte) { + ml.cache.del(key) +} + +func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer { + ml := &MemoryLayer{} + ml.keepUpdates = keepUpdates + if cacheSize > 0 { + cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 16, + Metrics: true, + Cost: func(val *CachePL) int64 { + return 1 + }, + ShouldUpdate: func(cur, prev *CachePL) bool { + return !(cur.list != nil && prev.list != nil && prev.list.maxTs > cur.list.maxTs) + }, + }) + x.Check(err) + go func() { + m := cache.Metrics + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for range ticker.C { + // Record the posting list cache hit ratio + ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) + } + }() + + ml.cache = &Cache{data: cache} + } + return ml } func NewCachePL() *CachePL { @@ -443,7 +464,7 @@ func checkForRollup(key []byte, l *List) { } func (ml *MemoryLayer) wait() { - ml.cache.Wait() + ml.cache.wait() } func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, commitTs uint64) { @@ -457,7 +478,7 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm return } - val, ok := ml.get([]byte(key)) + val, ok := ml.cache.get([]byte(key)) if !ok { return } @@ -617,7 +638,7 @@ func (c *CachePL) Set(l *List, readTs uint64) { } func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List { - cacheItem, ok := ml.get(key) + cacheItem, ok := ml.cache.get(key) if ok { cacheItem.count += 1 @@ -661,7 +682,7 @@ func (ml *MemoryLayer) saveInCache(key []byte, l *List) { cacheItem.count = 1 cacheItem.list = copyList(l) cacheItem.lastUpdate = l.maxTs - ml.set(key, cacheItem) + ml.cache.set(key, cacheItem) } func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { @@ -670,11 +691,8 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (* // we would have to read the correct timestamp from the disk. l := ml.readFromCache(key, readTs) if l != nil { - ml.numCacheRead += 1 l.mutationMap.setTs(readTs) return l, nil - } else { - ml.numCacheReadFails += 1 } l, err := ml.readFromDisk(key, pstore, math.MaxUint64) if err != nil { diff --git a/posting/oracle.go b/posting/oracle.go index c74190da8ce..f2da499a41a 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -105,8 +105,9 @@ func (vt *viTxn) GetWithLockHeld(key []byte) (rval index.Value, rerr error) { func (vt *viTxn) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) { value := pl.findStaticValue(vt.delegate.StartTs) + // When the posting is deleted, we find the key in the badger, but no postings in it. This should also + // return ErrKeyNotFound as that is what we except in the later functions. if value == nil || len(value.Postings) == 0 { - //fmt.Println("DIFF", val, err, nil, badger.ErrKeyNotFound) return nil, ErrNoValue } diff --git a/worker/draft.go b/worker/draft.go index 5e8225554b9..5565a769fff 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -357,7 +357,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return err } - // TODO: Revisit this when we work on posting cache. Clear entire cache. + // TODO: Revisit this when we work on posting cache. Don't clear entire cache. // We don't want to drop entire cache, just due to one namespace. posting.ResetCache() return nil diff --git a/worker/worker_test.go b/worker/worker_test.go index 047e0ae6861..16be1acb1fe 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -500,7 +500,7 @@ func TestMain(m *testing.M) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) m.Run()