Skip to content

Commit

Permalink
fixed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed Jan 7, 2025
1 parent 3fdd9ba commit a2db186
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 64 deletions.
1 change: 1 addition & 0 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ func TestReadSingleValue(t *testing.T) {
Value: []byte(fmt.Sprintf("ho hey there%d", i)),
}
txn := Txn{StartTs: i}
ol.mutationMap.setTs(i)
addMutationHelper(t, ol, edge, Set, &txn)
require.NoError(t, ol.commitMutation(i, i+1))
kData := ol.getMutation(i + 1)
Expand Down
6 changes: 0 additions & 6 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,6 @@ func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) {
closer = z.NewCloser(1)
go x.MonitorMemoryMetrics(closer)

// Initialize cache.
if cacheSize == 0 {
memoryLayer = &MemoryLayer{}
return
}

memoryLayer = initMemoryLayer(cacheSize, keepUpdates)
}

Expand Down
128 changes: 73 additions & 55 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"encoding/hex"
"math"
"runtime"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -88,7 +87,6 @@ var (
var memoryLayer *MemoryLayer

func init() {
runtime.SetCPUProfileRate(200)
x.AssertTrue(len(IncrRollup.priorityKeys) == 2)
for i := range IncrRollup.priorityKeys {
IncrRollup.priorityKeys[i] = &pooledKeys{
Expand Down Expand Up @@ -348,82 +346,105 @@ func RemoveCacheFor(key []byte) {
memoryLayer.del(key)
}

type MemoryLayer struct {
cache *ristretto.Cache[[]byte, *CachePL]

keepUpdates bool
type Cache struct {
data *ristretto.Cache[[]byte, *CachePL]

numCacheRead int
numCacheReadFails int
numDisksRead int
numCacheSave int
}

func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer {
ml := &MemoryLayer{}
ml.keepUpdates = keepUpdates
cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 16,
Metrics: true,
Cost: func(val *CachePL) int64 {
return 1
},
ShouldUpdate: func(cur, prev *CachePL) bool {
return !(cur.list != nil && prev.list != nil && prev.list.maxTs > cur.list.maxTs)
},
})
x.Check(err)
go func() {
m := cache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()
if cacheSize > 0 {
ml.cache = cache
func (c *Cache) wait() {
if c == nil {
return
}
return ml
c.data.Wait()
}

func (ml *MemoryLayer) get(key []byte) (*CachePL, bool) {
if ml.cache == nil {
func (c *Cache) get(key []byte) (*CachePL, bool) {
if c == nil {
return nil, false
}
val, ok := ml.cache.Get(key)
val, ok := c.data.Get(key)
if !ok {
c.numCacheReadFails += 1
return val, ok
}
if val.list == nil {
c.numCacheReadFails += 1
return nil, false
}
c.numCacheRead += 1
return val, true
}

func (ml *MemoryLayer) set(key []byte, i *CachePL) {
if ml.cache == nil {
func (c *Cache) set(key []byte, i *CachePL) {
if c == nil {
return
}
ml.cache.Set(key, i, 1)
c.numCacheSave += 1
c.data.Set(key, i, 1)
}

func (ml *MemoryLayer) del(key []byte) {
if ml.cache == nil {
func (c *Cache) del(key []byte) {
if c == nil {
return
}
ml.cache.Del(key)
c.data.Del(key)
}

func (ml *MemoryLayer) clear() {
if ml.cache == nil {
func (c *Cache) clear() {
if c == nil {
return
}
ml.cache.Clear()
c.data.Clear()
}

type MemoryLayer struct {
keepUpdates bool
cache *Cache

numDisksRead int
}

func (ml *MemoryLayer) clear() {
ml.cache.clear()
}
func (ml *MemoryLayer) del(key []byte) {
ml.cache.del(key)
}

func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer {
ml := &MemoryLayer{}
ml.keepUpdates = keepUpdates
if cacheSize > 0 {
cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(cacheSize) * 0.05 * 2),
MaxCost: int64(float64(cacheSize) * 0.95),
BufferItems: 16,
Metrics: true,
Cost: func(val *CachePL) int64 {
return 1
},
ShouldUpdate: func(cur, prev *CachePL) bool {
return !(cur.list != nil && prev.list != nil && prev.list.maxTs > cur.list.maxTs)
},
})
x.Check(err)
go func() {
m := cache.Metrics
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Record the posting list cache hit ratio
ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio()))
}
}()

ml.cache = &Cache{data: cache}
}
return ml
}

func NewCachePL() *CachePL {
Expand All @@ -443,7 +464,7 @@ func checkForRollup(key []byte, l *List) {
}

func (ml *MemoryLayer) wait() {
ml.cache.Wait()
ml.cache.wait()
}

func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, commitTs uint64) {
Expand All @@ -457,7 +478,7 @@ func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, comm
return
}

val, ok := ml.get([]byte(key))
val, ok := ml.cache.get([]byte(key))
if !ok {
return
}
Expand Down Expand Up @@ -617,7 +638,7 @@ func (c *CachePL) Set(l *List, readTs uint64) {
}

func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List {
cacheItem, ok := ml.get(key)
cacheItem, ok := ml.cache.get(key)

if ok {
cacheItem.count += 1
Expand Down Expand Up @@ -661,7 +682,7 @@ func (ml *MemoryLayer) saveInCache(key []byte, l *List) {
cacheItem.count = 1
cacheItem.list = copyList(l)
cacheItem.lastUpdate = l.maxTs
ml.set(key, cacheItem)
ml.cache.set(key, cacheItem)
}

func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
Expand All @@ -670,11 +691,8 @@ func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*
// we would have to read the correct timestamp from the disk.
l := ml.readFromCache(key, readTs)
if l != nil {
ml.numCacheRead += 1
l.mutationMap.setTs(readTs)
return l, nil
} else {
ml.numCacheReadFails += 1
}
l, err := ml.readFromDisk(key, pstore, math.MaxUint64)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion posting/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ func (vt *viTxn) GetWithLockHeld(key []byte) (rval index.Value, rerr error) {
func (vt *viTxn) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) {
value := pl.findStaticValue(vt.delegate.StartTs)

// When the posting is deleted, we find the key in the badger, but no postings in it. This should also
// return ErrKeyNotFound as that is what we except in the later functions.
if value == nil || len(value.Postings) == 0 {
//fmt.Println("DIFF", val, err, nil, badger.ErrKeyNotFound)
return nil, ErrNoValue
}

Expand Down
2 changes: 1 addition & 1 deletion worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr
return err
}

// TODO: Revisit this when we work on posting cache. Clear entire cache.
// TODO: Revisit this when we work on posting cache. Don't clear entire cache.
// We don't want to drop entire cache, just due to one namespace.
posting.ResetCache()
return nil
Expand Down
2 changes: 1 addition & 1 deletion worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestMain(m *testing.M) {
x.Check(err)
pstore = ps
// Not using posting list cache
posting.Init(ps, 0)
posting.Init(ps, 0, false)
Init(ps)

m.Run()
Expand Down

0 comments on commit a2db186

Please sign in to comment.