Skip to content

Commit

Permalink
Fixed commits
Browse files Browse the repository at this point in the history
  • Loading branch information
harshil-goel committed May 20, 2024
1 parent fab7c2d commit 7fcf01c
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 85 deletions.
45 changes: 4 additions & 41 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,11 +679,6 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e

// mposts is the list of mutable postings
deleteBelowTs, mposts := l.pickPostings(readTs)
return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, mposts)
}

func (l *List) iterateInternal(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error, deleteBelowTs uint64,
mposts []*pb.Posting) error {
if readTs < l.minTs {
return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key)
}
Expand Down Expand Up @@ -818,18 +813,18 @@ func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, boo
dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found1 := len(uids) > 0 && uids[0] == uid
found := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found1 = false
found = false
length = 0
continue
}
if mpost.Uid == uid {
found1 = (mpost.Op == Set)
found = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
Expand All @@ -841,7 +836,7 @@ func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, boo
}
}

return length, found1, nil
return length, found, nil
}

func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
Expand Down Expand Up @@ -877,38 +872,6 @@ func (l *List) length(readTs, afterUid uint64) int {
return count
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found1 := len(uids) > 0 && uids[0] == uid

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found1 = false
length = 0
continue
}
if mpost.Uid == uid {
found1 = (mpost.Op == Set)
}
if mpost.Op == Set {
length += 1
} else {
length -= 1
}

}
}
}

return length, found1, nil
}

// Length iterates over the mutation layer and counts number of elements.
func (l *List) Length(readTs, afterUid uint64) int {
l.RLock()
Expand Down
117 changes: 73 additions & 44 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ type incrRollupi struct {
closer *z.Closer
}

type CachePL struct {
count int
list *List
lastUpdate uint64
}

type GlobalCache struct {
sync.RWMutex

count map[string]int
list map[string]*List
lastUpdate map[string]uint64
items map[string]*CachePL
}

var (
Expand All @@ -80,11 +84,7 @@ var (
priorityKeys: make([]*pooledKeys, 2),
}

globalCache = &GlobalCache{
count: make(map[string]int),
list: make(map[string]*List),
lastUpdate: make(map[string]uint64),
}
globalCache = &GlobalCache{items: make(map[string]*CachePL)}
)

func init() {
Expand Down Expand Up @@ -338,9 +338,7 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {

func ResetCache() {
globalCache.Lock()
globalCache.count = make(map[string]int)
globalCache.list = make(map[string]*List)
globalCache.lastUpdate = make(map[string]uint64)
globalCache.items = make(map[string]*CachePL)
globalCache.Unlock()
lCache.Clear()
}
Expand All @@ -353,34 +351,38 @@ func (txn *Txn) UpdateCachedKeys(commitTs uint64) {

for key, delta := range txn.cache.deltas {
pk, _ := x.Parse([]byte(key))
if pk.IsData() {
if !ShouldGoInCache(pk) {
continue
}
globalCache.Lock()
val, ok := globalCache.items[key]
if !ok {
val = &CachePL{
count: 0,
list: nil,
lastUpdate: commitTs,
}
globalCache.items[key] = val
}
if commitTs != 0 {
// TODO Delete this if the values are too old in an async thread
globalCache.lastUpdate[key] = commitTs
val.lastUpdate = commitTs
}
val, ok := globalCache.count[key]
if !ok {
globalCache.Unlock()
continue
}
globalCache.count[key] = val - 1
if val == 1 {
delete(globalCache.count, key)
delete(globalCache.list, key)
val.count -= 1
if val.count == 1 {
val.list = nil
globalCache.Unlock()
continue
}

if commitTs != 0 {
pl, ok := globalCache.list[key]
if ok {
p := new(pb.PostingList)
x.Check(p.Unmarshal(delta))
pl.setMutationAfterCommit(txn.StartTs, commitTs, delta)
}
if commitTs != 0 && val.list != nil {
p := new(pb.PostingList)
x.Check(p.Unmarshal(delta))
val.list.setMutationAfterCommit(txn.StartTs, commitTs, delta)
}
globalCache.Unlock()
}
Expand Down Expand Up @@ -518,28 +520,50 @@ func copyList(l *List) *List {
return lCopy
}

func (c *CachePL) Set(l *List, readTs uint64) {
if c.lastUpdate < readTs && (c.list == nil || c.list.maxTs < l.maxTs) {
c.list = l
}
}

func ShouldGoInCache(pk x.ParsedKey) bool {
return !pk.IsData()
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if pstore.IsClosed() {
return nil, badger.ErrDBClosed
}

pk, _ := x.Parse(key)
globalCache.Lock()
globalCache.count[string(key)]++

// We use badger subscription to invalidate the cache. For every write we make the value
// corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache
// then it means that no writes have happened after the last set of this key in the cache.
if l, ok := globalCache.list[string(key)]; ok {
if l != nil && l.minTs <= readTs {
l.RLock()
lCopy := copyList(l)
l.RUnlock()
globalCache.Unlock()
return lCopy, nil

if ShouldGoInCache(pk) {
globalCache.Lock()
cacheItem, ok := globalCache.items[string(key)]
if !ok {
cacheItem = &CachePL{
count: 0,
list: nil,
lastUpdate: 0,
}
globalCache.items[string(key)] = cacheItem
}
cacheItem.count += 1

// We use badger subscription to invalidate the cache. For every write we make the value
// corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache
// then it means that no writes have happened after the last set of this key in the cache.
if ok {
if cacheItem.list != nil && cacheItem.list.minTs <= readTs {
cacheItem.list.RLock()
lCopy := copyList(cacheItem.list)
cacheItem.list.RUnlock()
globalCache.Unlock()
return lCopy, nil
}
}
globalCache.Unlock()
}
globalCache.Unlock()

txn := pstore.NewTransactionAt(readTs, false)
defer txn.Discard()
Expand All @@ -560,14 +584,19 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
// Only set l to the cache if readTs >= latestTs, which implies that l is
// the latest version of the PL. We also check that we're reading a version
// from Badger, which is higher than the write registered by the cache.
if !pk.IsData() && readTs >= l.maxTs {
if ShouldGoInCache(pk) {
globalCache.Lock()
l.RLock()
cacheList, ok := globalCache.list[string(key)]
if !ok || (ok && cacheList.maxTs < l.maxTs) {
if lastUpdateTs, k := globalCache.lastUpdate[string(key)]; !k || (k && lastUpdateTs < readTs) {
globalCache.list[string(key)] = copyList(l)
cacheItem, ok := globalCache.items[string(key)]
if !ok {
cacheItemNew := &CachePL{
count: 1,
list: copyList(l),
lastUpdate: l.maxTs,
}
globalCache.items[string(key)] = cacheItemNew
} else {
cacheItem.Set(copyList(l), readTs)
}
l.RUnlock()
globalCache.Unlock()
Expand Down

0 comments on commit 7fcf01c

Please sign in to comment.