Skip to content

Commit

Permalink
fix(core): fix duplicate mutation entries for count index (#9208)
Browse files Browse the repository at this point in the history
Title format:

 - `Topic(Area): Feature`
- `Topic` must be one of
`build|ci|docs|feat|fix|perf|refactor|chore|test`
- `Area` must be one of
`acl|audit|backup|badger|cdc|dql|export|graphql|indexing|multi-tenancy|raft|restore|upgrade|zero`
 - * Add [BREAKING] if it is a breaking change

------------------
Body Format:

Description: <add description>
Fixes: <GitHub Issue>
Closes: <JIRA Issue>
Docs: <docs PR>
  • Loading branch information
harshil-goel authored Nov 1, 2024
1 parent adfd278 commit 237dd74
Show file tree
Hide file tree
Showing 10 changed files with 633 additions and 400 deletions.
12 changes: 11 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,10 @@ func lookup(db *badger.DB) {
if err != nil {
log.Fatal(err)
}
fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
pl.RLock()
c, _, _ := pl.GetLength(math.MaxUint64)
pl.RUnlock()
fmt.Fprintf(&buf, " Length: %d", c)

splits := pl.PartSplits()
isMultiPart := len(splits) > 0
Expand Down Expand Up @@ -611,6 +614,13 @@ func printKeys(db *badger.DB) {
}

var sz, deltaCount int64
pl, err := posting.GetNew(key, db, opt.readTs)
if err == nil {
pl.RLock()
c, _, _ := pl.GetLength(math.MaxUint64)
fmt.Fprintf(&buf, " countValue: [%d]", c)
pl.RUnlock()
}
LOOP:
for ; itr.ValidForPrefix(prefix); itr.Next() {
item := itr.Item()
Expand Down
77 changes: 36 additions & 41 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo)
// that we care about just data[0].
// Similarly, the current assumption is that we have at most one
// Vector Index, but this assumption may break later.
if info.op == pb.DirectedEdge_SET &&
if info.op != pb.DirectedEdge_DEL &&
len(data) > 0 && data[0].Tid == types.VFloatID &&
len(info.factorySpecs) > 0 {
// retrieve vector from inUuid save as inVec
Expand Down Expand Up @@ -236,6 +236,18 @@ type countParams struct {
reverse bool
}

// When we want to update count edges, we should set them with OVR instead of SET as SET will mess with count
func shouldAddCountEdge(found bool, edge *pb.DirectedEdge) bool {
if found {
if edge.Op != pb.DirectedEdge_DEL {
edge.Op = pb.DirectedEdge_OVR
}
return true
} else {
return edge.Op != pb.DirectedEdge_DEL
}
}

func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) {
countBefore, countAfter := 0, 0
Expand All @@ -245,12 +257,14 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
defer plist.Unlock()
if hasCountIndex {
countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId)
if countBefore == -1 {
if countBefore < 0 {
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
}
}
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
}
}
if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, edge.Op)
Expand Down Expand Up @@ -314,7 +328,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
// entries for this key in the index are removed.
pred, ok := schema.State().Get(ctx, t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
t.Op == pb.DirectedEdge_SET && t.ValueId != 0
t.Op != pb.DirectedEdge_DEL && t.ValueId != 0
if isSingleUidUpdate {
dataKey := x.DataKey(t.Attr, t.Entity)
dataList, err := getFn(dataKey)
Expand Down Expand Up @@ -461,7 +475,7 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
}

func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
if !found && op == pb.DirectedEdge_SET {
if !found && op != pb.DirectedEdge_DEL {
return countBefore + 1
} else if found && op == pb.DirectedEdge_DEL {
return countBefore - 1
Expand Down Expand Up @@ -534,8 +548,10 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
}
}

if err = l.addMutationInternal(ctx, txn, t); err != nil {
return val, found, emptyCountParams, err
if !(hasCountIndex && !shouldAddCountEdge(found && currPost.Op != Del, t)) {
if err = l.addMutationInternal(ctx, txn, t); err != nil {
return val, found, emptyCountParams, err
}
}

if found && doUpdateIndex {
Expand Down Expand Up @@ -599,7 +615,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
return err
}
}
if edge.Op == pb.DirectedEdge_SET {
if edge.Op != pb.DirectedEdge_DEL {
val = types.Val{
Tid: types.TypeID(edge.ValueType),
Value: edge.Value,
Expand Down Expand Up @@ -898,15 +914,13 @@ func (r *rebuilder) Run(ctx context.Context) error {
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

var txn *Txn

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
//TODO We need to create a single transaction irrespective of the type of the predicate
if pred.ValueType == pb.Posting_VFLOAT {
txn = NewTxn(r.startTs)
x.AssertTrue(false)
}
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand All @@ -926,44 +940,25 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, errors.Wrapf(err, "error reading posting list from disk")
}

// We are using different transactions in each call to KeyToList function. This could
// be a problem for computing reverse count indexes if deltas for same key are added
// in different transactions. Such a case doesn't occur for now.
// TODO: Maybe we can always use txn initialized in rebuilder.Run().
streamTxn := txn
if streamTxn == nil {
streamTxn = NewTxn(r.startTs)
}
edges, err := r.fn(pk.Uid, l, streamTxn)
kvs, err := l.Rollup(nil, r.startTs)
if err != nil {
return nil, err
}

if txn != nil {
kvs := make([]*bpb.KV, 0, len(edges))
for _, edge := range edges {
version := atomic.AddUint64(&counter, 1)
key := x.DataKey(edge.Attr, edge.Entity)
pl, err := txn.GetFromDelta(key)
if err != nil {
return &bpb.KVList{}, nil
}
data := pl.getMutation(r.startTs)
kv := bpb.KV{
Key: x.DataKey(edge.Attr, edge.Entity),
Value: data,
UserMeta: []byte{BitDeltaPosting},
Version: version,
}
kvs = append(kvs, &kv)
}
return &bpb.KVList{Kv: kvs}, nil
for _, kv := range kvs {
version := atomic.AddUint64(&counter, 1)
kv.Version = version
}

streamTxn := NewTxn(r.startTs)
_, err = r.fn(pk.Uid, l, streamTxn)
if err != nil {
return nil, err
}

// Convert data into deltas.
streamTxn.Update()
// txn.cache.Lock() is not required because we are the only one making changes to txn.
kvs := make([]*bpb.KV, 0, len(streamTxn.cache.deltas))
for key, data := range streamTxn.cache.deltas {
version := atomic.AddUint64(&counter, 1)
kv := bpb.KV{
Expand Down
31 changes: 24 additions & 7 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ var (
)

const (
// Set means overwrite in mutation layer. It contributes 0 in Length.
// Set means set in mutation layer. It contributes 1 in Length.
Set uint32 = 0x01
// Del means delete in mutation layer. It contributes -1 in Length.
Del uint32 = 0x02
// Ovr means overwrite in mutation layer. It contributes 0 in Length.
Ovr uint32 = 0x03

// BitSchemaPosting signals that the value stores a schema or type.
BitSchemaPosting byte = 0x01
Expand Down Expand Up @@ -305,6 +307,8 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
switch t.Op {
case pb.DirectedEdge_SET:
op = Set
case pb.DirectedEdge_OVR:
op = Set
case pb.DirectedEdge_DEL:
op = Del
default:
Expand Down Expand Up @@ -340,7 +344,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)

// If we have a delete all, then we replace the map entry with just one.
if hasDeleteAll(mpost) {
Expand Down Expand Up @@ -529,7 +533,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
}
pred, ok := schema.State().Get(ctx, t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op == Set && mpost.PostingType == pb.Posting_REF
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
Expand All @@ -555,6 +559,10 @@ func (l *List) getPosting(startTs uint64) *pb.PostingList {
return nil
}

func (l *List) GetPosting(startTs uint64) *pb.PostingList {
return l.getPosting(startTs)
}

// getMutation returns a marshaled version of posting list mutation stored internally.
func (l *List) getMutation(startTs uint64) []byte {
l.RLock()
Expand Down Expand Up @@ -817,31 +825,40 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
return count == 0, nil
}

func (l *List) GetLength(readTs uint64) (int, bool, *pb.Posting) {
return l.getPostingAndLengthNoSort(readTs, 0, 0)
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found := len(uids) > 0 && uids[0] == uid
found_ts := uint64(0)

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
ts := mpost.CommitTs
if mpost.StartTs == readTs {
ts = mpost.StartTs
}
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found = false
length = 0
continue
}
if mpost.Uid == uid {
found = (mpost.Op == Set)
if mpost.Uid == uid && found_ts < ts {
found = (mpost.Op != Del)
found_ts = ts
}
if mpost.Op == Set {
length += 1
} else {
} else if mpost.Op == Del {
length -= 1
}

}
}
}
Expand Down
4 changes: 4 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,10 @@ func PostingListCacheEnabled() bool {
return lCache != nil
}

func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
return getNew(key, pstore, readTs)
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if PostingListCacheEnabled() {
l, ok := lCache.Get(key)
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ message DirectedEdge {
enum Op {
SET = 0;
DEL = 1;
OVR = 2;
}
Op op = 8;
repeated api.Facet facets = 9;
Expand Down
Loading

0 comments on commit 237dd74

Please sign in to comment.