diff --git a/posting/index.go b/posting/index.go index 9517c6750b3..38d3e4c6094 100644 --- a/posting/index.go +++ b/posting/index.go @@ -239,7 +239,7 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List, plist.Lock() defer plist.Unlock() if hasCountIndex { - countBefore, found, _ = plist.getPostingAndLength(txn.StartTs, 0, edge.ValueId) + countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId) if countBefore == -1 { return emptyCountParams, ErrTsTooOld } diff --git a/posting/list.go b/posting/list.go index 4dae3ce52fd..4457fce7cbe 100644 --- a/posting/list.go +++ b/posting/list.go @@ -556,6 +556,22 @@ func (l *List) getMutation(startTs uint64) []byte { return nil } +func (l *List) setMutationAfterCommit(startTs, commitTs uint64, data []byte) { + pl := new(pb.PostingList) + x.Check(pl.Unmarshal(data)) + pl.CommitTs = commitTs + + l.Lock() + if l.mutationMap == nil { + l.mutationMap = make(map[uint64]*pb.PostingList) + } + l.mutationMap[startTs] = pl + if pl.CommitTs != 0 { + l.maxTs = x.Max(l.maxTs, pl.CommitTs) + } + l.Unlock() +} + func (l *List) setMutation(startTs uint64, data []byte) { pl := new(pb.PostingList) x.Check(pl.Unmarshal(data)) @@ -565,6 +581,9 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.mutationMap = make(map[uint64]*pb.PostingList) } l.mutationMap[startTs] = pl + if pl.CommitTs != 0 { + l.maxTs = x.Max(l.maxTs, pl.CommitTs) + } l.Unlock() } @@ -649,11 +668,64 @@ func (l *List) pickPostings(readTs uint64) (uint64, []*pb.Posting) { return deleteBelowTs, posts } +func (l *List) iterateNoSort(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { + l.AssertRLock() + + effective := func(start, commit uint64) uint64 { + if commit > 0 && commit <= readTs { + // Has been committed and below the readTs. + return commit + } + if start == readTs { + // This mutation is by ME. So, I must be able to read it. + return start + } + return 0 + } + + // First pick up the postings. + var deleteBelowTs uint64 + var posts []*pb.Posting + for startTs, plist := range l.mutationMap { + // Pick up the transactions which are either committed, or the one which is ME. + effectiveTs := effective(startTs, plist.CommitTs) + if effectiveTs > deleteBelowTs { + // We're above the deleteBelowTs marker. We wouldn't reach here if effectiveTs is zero. + for _, mpost := range plist.Postings { + if hasDeleteAll(mpost) { + deleteBelowTs = effectiveTs + continue + } + posts = append(posts, mpost) + } + } + } + + if deleteBelowTs > 0 { + // There was a delete all marker. So, trim down the list of postings. + result := posts[:0] + for _, post := range posts { + effectiveTs := effective(post.StartTs, post.CommitTs) + if effectiveTs < deleteBelowTs { // Do pick the posts at effectiveTs == deleteBelowTs. + continue + } + result = append(result, post) + } + posts = result + } + + return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, posts) +} + func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error) error { l.AssertRLock() // mposts is the list of mutable postings deleteBelowTs, mposts := l.pickPostings(readTs) + return l.iterateInternal(readTs, afterUid, f, deleteBelowTs, mposts) +} + +func (l *List) iterateInternal(readTs uint64, afterUid uint64, f func(obj *pb.Posting) error, deleteBelowTs uint64, mposts []*pb.Posting) error { if readTs < l.minTs { return errors.Errorf("readTs: %d less than minTs: %d for key: %q", readTs, l.minTs, l.key) } @@ -762,6 +834,26 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) { return count == 0, nil } +func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { + l.AssertRLock() + var count int + var found bool + var post *pb.Posting + err := l.iterateNoSort(readTs, afterUid, func(p *pb.Posting) error { + if p.Uid == uid { + post = p + found = true + } + count++ + return nil + }) + if err != nil { + return -1, false, nil + } + + return count, found, post +} + func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) { l.AssertRLock() var count int diff --git a/posting/lists.go b/posting/lists.go index 5f69c6df7df..0b587a97e26 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -107,7 +107,8 @@ func GetNoStore(key []byte, readTs uint64) (rlist *List, err error) { type LocalCache struct { sync.RWMutex - startTs uint64 + startTs uint64 + commitTs uint64 // The keys for these maps is a string representation of the Badger key for the posting list. // deltas keep track of the updates made by txn. These must be kept around until written to disk @@ -175,6 +176,12 @@ func NoCache(startTs uint64) *LocalCache { return &LocalCache{startTs: startTs} } +func (lc *LocalCache) UpdateCommitTs(commitTs uint64) { + lc.Lock() + defer lc.Unlock() + lc.commitTs = commitTs +} + func (lc *LocalCache) Find(pred []byte, filter func([]byte) bool) (uint64, error) { txn := pstore.NewTransactionAt(lc.startTs, false) defer txn.Discard() @@ -318,7 +325,11 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) // apply it before returning the list. lc.RLock() if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) + if lc.commitTs == 0 { + pl.setMutation(lc.startTs, delta) + } else { + pl.setMutationAfterCommit(lc.startTs, lc.commitTs, delta) + } } lc.RUnlock() return lc.SetIfAbsent(skey, pl), nil diff --git a/posting/mvcc.go b/posting/mvcc.go index a29bab6bb87..91448ec1f24 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -19,8 +19,6 @@ package posting import ( "bytes" "encoding/hex" - "fmt" - "math" "strconv" "sync" "sync/atomic" @@ -109,7 +107,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { } } - l, err := GetNoStore(key, math.MaxUint64) + l, err := GetNoStore(key, ts) if err != nil { return err } @@ -119,9 +117,18 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { return err } + if len(kvs) > 0 { + pl := new(pb.PostingList) + x.Check(pl.Unmarshal(kvs[0].Value)) + l.Lock() + l.plist = pl + l.mutationMap = nil + l.maxTs = kvs[0].Version + l.Unlock() + } + l.RLock() - fmt.Println("Rolling up") - lCache.Set(key, copyList(l), int64(l.DeepSize())) + lCache.Set(key, l, int64(l.DeepSize())) l.RUnlock() // If we do a rollup, we typically won't need to update the key in cache. @@ -487,7 +494,10 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { l := val.(*List) // l.maxTs can be greater than readTs. We might have the latest // version cached, while readTs is looking for an older version. - if l != nil && l.maxTs >= readTs { + if l != nil { + //fmt.Println(l.maxTs, readTs) + } + if l != nil && l.maxTs > 0 { l.RLock() lCopy := copyList(l) l.RUnlock() @@ -518,6 +528,7 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if readTs >= l.maxTs { l.RLock() defer l.RUnlock() + //fmt.Println("Setting", l.maxTs) lCache.Set(key, copyList(l), int64(l.DeepSize())) } return l, nil