From 16f36d310a402602bb6498f54a766050b91acbe8 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 27 Sep 2024 07:53:05 +0530 Subject: [PATCH] added sharded map --- dgraph/cmd/alpha/run.go | 6 +- dgraph/cmd/debug/run.go | 2 +- go.mod | 2 +- go.sum | 4 +- posting/index.go | 1 + posting/index_test.go | 3 + posting/list.go | 18 +- posting/list_test.go | 6 +- posting/lists.go | 43 +--- posting/mvcc.go | 349 ++++++++++++++++++----------- posting/mvcc_test.go | 64 +++++- posting/oracle.go | 5 +- systest/21million/bulk/run_test.go | 49 +++- worker/config.go | 5 + worker/draft.go | 4 +- worker/mutation_unit_test.go | 2 +- worker/online_restore.go | 1 + worker/sort_test.go | 28 ++- worker/worker_test.go | 3 +- x/keys.go | 2 +- 20 files changed, 407 insertions(+), 190 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 3d2855d674d..f4565a07f62 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -145,6 +145,8 @@ they form a Raft group and provide synchronous replication. Flag("percentage", "Cache percentages summing up to 100 for various caches (FORMAT: PostingListCache,"+ "PstoreBlockCache,PstoreIndexCache)"). + Flag("keep-updates", + "Should carry updates in cache or not (bool)"). String()) flag.String("raft", worker.RaftDefaults, z.NewSuperFlagHelp(worker.RaftDefaults). @@ -633,6 +635,7 @@ func run() { x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative") cachePercentage := cache.GetString("percentage") + keepUpdates := cache.GetBool("keep-updates") cachePercent, err := x.GetCachePercentages(cachePercentage, 3) x.Check(err) postingListCacheSize := (cachePercent[0] * (totalCache << 20)) / 100 @@ -655,6 +658,7 @@ func run() { WALDir: Alpha.Conf.GetString("wal"), CacheMb: totalCache, CachePercentage: cachePercentage, + KeepUpdates: keepUpdates, MutationsMode: worker.AllowMutations, AuthToken: security.GetString("token"), @@ -782,7 +786,7 @@ func run() { // Posting will initialize index which requires schema. Hence, initialize // schema before calling posting.Init(). schema.Init(worker.State.Pstore) - posting.Init(worker.State.Pstore, postingListCacheSize) + posting.Init(worker.State.Pstore, postingListCacheSize, keepUpdates) defer posting.Cleanup() worker.Init(worker.State.Pstore) diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 2442024a4ee..22b4dadc556 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -1024,7 +1024,7 @@ func run() { db, err := badger.OpenManaged(bopts) x.Check(err) // Not using posting list cache - posting.Init(db, 0) + posting.Init(db, 0, false) defer db.Close() printSummary(db) diff --git a/go.mod b/go.mod index 906f0bff6f1..599bdc5fcf2 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/dgraph-io/gqlgen v0.13.2 github.com/dgraph-io/gqlparser/v2 v2.2.2 github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 - github.com/dgraph-io/ristretto/v2 v2.0.1 + github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470 github.com/dgraph-io/simdjson-go v0.3.0 github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da github.com/dgryski/go-groupvarint v0.0.0-20230630160417-2bfb7969fb3c diff --git a/go.sum b/go.sum index 795e6a4ca35..3912cbcf29c 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,8 @@ github.com/dgraph-io/gqlparser/v2 v2.2.2 h1:CnxXOKL4EPguKqcGV/z4u4VoW5izUkOTIsNM github.com/dgraph-io/gqlparser/v2 v2.2.2/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4= github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ= -github.com/dgraph-io/ristretto/v2 v2.0.1 h1:7W0LfEP+USCmtrUjJsk+Jv2jbhJmb72N4yRI7GrLdMI= -github.com/dgraph-io/ristretto/v2 v2.0.1/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo= +github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470 h1:rVn0l0Hvab+/kCvTMFrUFtRtob+jagzgVVSfbyqCJ5E= +github.com/dgraph-io/ristretto/v2 v2.0.1-0.20241225154905-c692ff024470/go.mod h1:K7caLeufSdxm+ITp1n/73U+VbFVAHrexfLbz4n14hpo= github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0= github.com/dgraph-io/simdjson-go v0.3.0/go.mod h1:Otpysdjaxj9OGaJusn4pgQV7OFh2bELuHANq0I78uvY= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= diff --git a/posting/index.go b/posting/index.go index 798de6eb52a..739b45898a1 100644 --- a/posting/index.go +++ b/posting/index.go @@ -1011,6 +1011,7 @@ func (r *rebuilder) Run(ctx context.Context) error { return nil, err } + memoryLayer.del(key) return &bpb.KVList{Kv: kvs}, nil } tmpStream.Send = func(buf *z.Buffer) error { diff --git a/posting/index_test.go b/posting/index_test.go index c09762a21ee..36f2d2278f5 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -157,6 +157,7 @@ func addMutation(t *testing.T, l *List, edge *pb.DirectedEdge, op uint32, } txn.Update() + txn.UpdateCachedKeys(commitTs) writer := NewTxnWriter(pstore) require.NoError(t, txn.CommitToDisk(writer, commitTs)) require.NoError(t, writer.Flush()) @@ -271,6 +272,7 @@ func addEdgeToUID(t *testing.T, attr string, src uint64, func TestCountReverseIndexWithData(t *testing.T) { require.NoError(t, pstore.DropAll()) + memoryLayer.clear() indexNameCountVal := "testcount: [uid] @count @reverse ." attr := x.GalaxyAttr("testcount") @@ -305,6 +307,7 @@ func TestCountReverseIndexWithData(t *testing.T) { func TestCountReverseIndexEmptyPosting(t *testing.T) { require.NoError(t, pstore.DropAll()) + memoryLayer.clear() indexNameCountVal := "testcount: [uid] @count @reverse ." attr := x.GalaxyAttr("testcount") diff --git a/posting/list.go b/posting/list.go index e8c641a2759..1952c83f4ce 100644 --- a/posting/list.go +++ b/posting/list.go @@ -121,6 +121,13 @@ func newMutableLayer() *MutableLayer { } } +func (mm *MutableLayer) setTs(readTs uint64) { + if mm == nil { + return + } + mm.readTs = readTs +} + // This function clones an existing mutable layer for the new transactions. This function makes sure we copy the right // things from the existing mutable layer for the new list. It basically copies committedEntries using reference and // ignores currentEntires and readTs. Similarly, all the cache items related to currentEntries are ignored and @@ -866,6 +873,12 @@ func GetConflictKey(pk x.ParsedKey, key []byte, t *pb.DirectedEdge) uint64 { return conflictKey } +// SetTs allows us to set the transaction timestamp in mutation map. Should be used before the posting list is passed +// on to the functions that would read the data. +func (l *List) SetTs(readTs uint64) { + l.mutationMap.setTs(readTs) +} + func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.DirectedEdge) error { l.AssertLock() @@ -978,6 +991,9 @@ func (l *List) setMutationAfterCommit(startTs, commitTs uint64, pl *pb.PostingLi } l.mutationMap.committedUids[mpost.Uid] = mpost + if l.mutationMap.length == math.MaxInt64 { + l.mutationMap.length = 0 + } l.mutationMap.length += getLengthDelta(mpost.Op) } @@ -999,7 +1015,6 @@ func (l *List) setMutation(startTs uint64, data []byte) { l.mutationMap = newMutableLayer() } l.mutationMap.setCurrentEntries(startTs, pl) - if pl.CommitTs != 0 { l.maxTs = x.Max(l.maxTs, pl.CommitTs) } @@ -1258,6 +1273,7 @@ func (l *List) getPostingAndLength(readTs, afterUid, uid uint64) (int, bool, *pb var count int var found bool var post *pb.Posting + err := l.iterate(readTs, afterUid, func(p *pb.Posting) error { if p.Uid == uid { post = p diff --git a/posting/list_test.go b/posting/list_test.go index 74a4df13dad..f4bdf9c1e8a 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -504,6 +504,7 @@ func TestAddMutation_mrjn1(t *testing.T) { func TestReadSingleValue(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 + require.Equal(t, nil, pstore.DropAll()) // We call pl.Iterate and then stop iterating in the first loop when we are reading // single values. This test confirms that the two functions, getFirst from this file @@ -518,6 +519,7 @@ func TestReadSingleValue(t *testing.T) { Value: []byte(fmt.Sprintf("ho hey there%d", i)), } txn := Txn{StartTs: i} + ol.mutationMap.setTs(i) addMutationHelper(t, ol, edge, Set, &txn) require.NoError(t, ol.commitMutation(i, i+1)) kData := ol.getMutation(i + 1) @@ -532,6 +534,8 @@ func TestReadSingleValue(t *testing.T) { kvs, err := ol.Rollup(nil, txn.StartTs-3) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) + // Delete item from global cache before reading, as we are not updating the cache in the test + memoryLayer.del(key) ol, err = getNew(key, ps, math.MaxUint64) require.NoError(t, err) } @@ -1803,7 +1807,7 @@ func TestMain(m *testing.M) { ps, err = badger.OpenManaged(badger.DefaultOptions(dir)) x.Panic(err) // Not using posting list cache - Init(ps, 0) + Init(ps, 0, false) schema.Init(ps) m.Run() diff --git a/posting/lists.go b/posting/lists.go index 933b547712c..7c439149565 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -18,17 +18,13 @@ package posting import ( "bytes" - "context" "fmt" "sync" - "time" - ostats "go.opencensus.io/stats" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgo/v240/protos/api" - "github.com/dgraph-io/ristretto/v2" "github.com/dgraph-io/ristretto/v2/z" "github.com/hypermodeinc/dgraph/v24/protos/pb" "github.com/hypermodeinc/dgraph/v24/tok/index" @@ -42,41 +38,15 @@ const ( var ( pstore *badger.DB closer *z.Closer - lCache *ristretto.Cache[[]byte, *List] ) // Init initializes the posting lists package, the in memory and dirty list hash. -func Init(ps *badger.DB, cacheSize int64) { +func Init(ps *badger.DB, cacheSize int64, keepUpdates bool) { pstore = ps closer = z.NewCloser(1) go x.MonitorMemoryMetrics(closer) - // Initialize cache. - if cacheSize == 0 { - return - } - - var err error - lCache, err = ristretto.NewCache[[]byte, *List](&ristretto.Config[[]byte, *List]{ - // Use 5% of cache memory for storing counters. - NumCounters: int64(float64(cacheSize) * 0.05 * 2), - MaxCost: int64(float64(cacheSize) * 0.95), - BufferItems: 64, - Metrics: true, - Cost: func(val *List) int64 { - return 0 - }, - }) - x.Check(err) - go func() { - m := lCache.Metrics - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for range ticker.C { - // Record the posting list cache hit ratio - ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) - } - }() + memoryLayer = initMemoryLayer(cacheSize, keepUpdates) } func UpdateMaxCost(maxCost int64) { @@ -145,7 +115,7 @@ func (vc *viLocalCache) GetWithLockHeld(key []byte) (rval index.Value, rerr erro func (vc *viLocalCache) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) { value := pl.findStaticValue(vc.delegate.startTs) - if value == nil { + if value == nil || len(value.Postings) == 0 { return nil, ErrNoValue } @@ -314,8 +284,9 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } } else { pl = &List{ - key: key, - plist: new(pb.PostingList), + key: key, + plist: new(pb.PostingList), + mutationMap: newMutableLayer(), } } @@ -421,8 +392,6 @@ func (lc *LocalCache) UpdateDeltasAndDiscardLists() { } for key, pl := range lc.plists { - //pk, _ := x.Parse([]byte(key)) - //fmt.Printf("{TXN} Closing %v\n", pk) data := pl.getMutation(lc.startTs) if len(data) > 0 { lc.deltas[key] = data diff --git a/posting/mvcc.go b/posting/mvcc.go index 5feeb0e511a..4144886729a 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -18,20 +18,23 @@ package posting import ( "bytes" + "context" "encoding/hex" + "math" "strconv" - "strings" "sync" "sync/atomic" "time" "github.com/golang/glog" "github.com/pkg/errors" + ostats "go.opencensus.io/stats" "google.golang.org/protobuf/proto" "github.com/dgraph-io/badger/v4" bpb "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/dgo/v240/protos/api" + "github.com/dgraph-io/ristretto/v2" "github.com/dgraph-io/ristretto/v2/z" "github.com/hypermodeinc/dgraph/v24/protos/pb" "github.com/hypermodeinc/dgraph/v24/x" @@ -63,12 +66,7 @@ type CachePL struct { count int list *List lastUpdate uint64 -} - -type GlobalCache struct { - sync.RWMutex - - items map[string]*CachePL + lastRead time.Time } var ( @@ -84,10 +82,10 @@ var ( IncrRollup = &incrRollupi{ priorityKeys: make([]*pooledKeys, 2), } - - globalCache = &GlobalCache{items: make(map[string]*CachePL, 100)} ) +var memoryLayer *MemoryLayer + func init() { x.AssertTrue(len(IncrRollup.priorityKeys) == 2) for i := range IncrRollup.priorityKeys { @@ -134,13 +132,7 @@ func (ir *incrRollupi) rollUpKey(writer *TxnWriter, key []byte) error { } RemoveCacheFor(key) - - globalCache.Lock() - val, ok := globalCache.items[string(key)] - if ok { - val.list = nil - } - globalCache.Unlock() + memoryLayer.del(key) // TODO Update cache with rolled up results // If we do a rollup, we typically won't need to update the key in cache. // The only caveat is that the key written by rollup would be written at +1 @@ -346,20 +338,113 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error { } func ResetCache() { - if lCache != nil { - lCache.Clear() - } - globalCache.Lock() - globalCache.items = make(map[string]*CachePL) - globalCache.Unlock() + memoryLayer.clear() } // RemoveCacheFor will delete the list corresponding to the given key. func RemoveCacheFor(key []byte) { - // TODO: investigate if this can be done by calling Set with a nil value. - if lCache != nil { - lCache.Del(key) + memoryLayer.del(key) +} + +type Cache struct { + data *ristretto.Cache[[]byte, *CachePL] + + numCacheRead int + numCacheReadFails int + numCacheSave int +} + +func (c *Cache) wait() { + if c == nil { + return + } + c.data.Wait() +} + +func (c *Cache) get(key []byte) (*CachePL, bool) { + if c == nil { + return nil, false + } + val, ok := c.data.Get(key) + if !ok { + c.numCacheReadFails += 1 + return val, ok + } + if val.list == nil { + c.numCacheReadFails += 1 + return nil, false + } + c.numCacheRead += 1 + return val, true +} + +func (c *Cache) set(key []byte, i *CachePL) { + if c == nil { + return + } + c.numCacheSave += 1 + c.data.Set(key, i, 1) +} + +func (c *Cache) del(key []byte) { + if c == nil { + return + } + c.data.Del(key) +} + +func (c *Cache) clear() { + if c == nil { + return + } + c.data.Clear() +} + +type MemoryLayer struct { + keepUpdates bool + cache *Cache + + numDisksRead int +} + +func (ml *MemoryLayer) clear() { + ml.cache.clear() +} +func (ml *MemoryLayer) del(key []byte) { + ml.cache.del(key) +} + +func initMemoryLayer(cacheSize int64, keepUpdates bool) *MemoryLayer { + ml := &MemoryLayer{} + ml.keepUpdates = keepUpdates + if cacheSize > 0 { + cache, err := ristretto.NewCache[[]byte, *CachePL](&ristretto.Config[[]byte, *CachePL]{ + // Use 5% of cache memory for storing counters. + NumCounters: int64(float64(cacheSize) * 0.05 * 2), + MaxCost: int64(float64(cacheSize) * 0.95), + BufferItems: 16, + Metrics: true, + Cost: func(val *CachePL) int64 { + return 1 + }, + ShouldUpdate: func(cur, prev *CachePL) bool { + return !(cur.list != nil && prev.list != nil && prev.list.maxTs > cur.list.maxTs) + }, + }) + x.Check(err) + go func() { + m := cache.Metrics + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + for range ticker.C { + // Record the posting list cache hit ratio + ostats.Record(context.Background(), x.PLCacheHitRatio.M(m.Ratio())) + } + }() + + ml.cache = &Cache{data: cache} } + return ml } func NewCachePL() *CachePL { @@ -370,42 +455,55 @@ func NewCachePL() *CachePL { } } +func checkForRollup(key []byte, l *List) { + deltaCount := l.mutationMap.len() + // If deltaCount is high, send it to high priority channel instead. + if deltaCount > 500 { + IncrRollup.addKeyToBatch(key, 0) + } +} + +func (ml *MemoryLayer) wait() { + ml.cache.wait() +} + +func (ml *MemoryLayer) updateItemInCache(key string, delta []byte, startTs, commitTs uint64) { + if commitTs == 0 { + return + } + + if !ml.keepUpdates { + // TODO We should mark the key as deleted instead of directly deleting from the cache. + ml.del([]byte(key)) + return + } + + val, ok := ml.cache.get([]byte(key)) + if !ok { + return + } + + val.lastUpdate = commitTs + val.count -= 1 + + if val.list != nil && ml.keepUpdates { + p := new(pb.PostingList) + x.Check(proto.Unmarshal(delta, p)) + + val.list.setMutationAfterCommit(startTs, commitTs, p, true) + checkForRollup([]byte(key), val.list) + } +} + // RemoveCachedKeys will delete the cached list by this txn. func (txn *Txn) UpdateCachedKeys(commitTs uint64) { if txn == nil || txn.cache == nil { return } + memoryLayer.wait() for key, delta := range txn.cache.deltas { - RemoveCacheFor([]byte(key)) - pk, _ := x.Parse([]byte(key)) - if !ShouldGoInCache(pk) { - continue - } - globalCache.Lock() - val, ok := globalCache.items[key] - if !ok { - val = NewCachePL() - val.lastUpdate = commitTs - globalCache.items[key] = val - } - if commitTs != 0 { - // TODO Delete this if the values are too old in an async thread - val.lastUpdate = commitTs - } - if !ok { - globalCache.Unlock() - continue - } - - val.count -= 1 - - if commitTs != 0 && val.list != nil { - p := new(pb.PostingList) - x.Check(proto.Unmarshal(delta, p)) - val.list.setMutationAfterCommit(txn.StartTs, commitTs, p, true) - } - globalCache.Unlock() + memoryLayer.updateItemInCache(key, delta, txn.StartTs, commitTs) } } @@ -449,6 +547,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { l := new(List) l.key = key l.plist = new(pb.PostingList) + l.minTs = 0 // We use the following block of code to trigger incremental rollup on this key. deltaCount := 0 @@ -477,14 +576,13 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { switch item.UserMeta() { case BitEmptyPosting: - l.minTs = item.Version() return l, nil case BitCompletePosting: if err := unmarshalOrCopy(l.plist, item); err != nil { return nil, err } - l.minTs = item.Version() + l.minTs = item.Version() // No need to do Next here. The outer loop can take care of skipping // more versions of the same key. return l, nil @@ -539,66 +637,25 @@ func (c *CachePL) Set(l *List, readTs uint64) { } } -func ShouldGoInCache(pk x.ParsedKey) bool { - return (!pk.IsData() && strings.HasSuffix(pk.Attr, "dgraph.type")) -} - -func PostingListCacheEnabled() bool { - return lCache != nil -} - -func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - return getNew(key, pstore, readTs) -} - -func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { - if PostingListCacheEnabled() { - l, ok := lCache.Get(key) - if ok && l != nil { - // No need to clone the immutable layer or the key since mutations will not modify it. - lCopy := &List{ - minTs: l.minTs, - maxTs: l.maxTs, - key: key, - plist: l.plist, - } - l.RLock() - lCopy.mutationMap = l.mutationMap.clone() - l.RUnlock() - return lCopy, nil - } - } - - if pstore.IsClosed() { - return nil, badger.ErrDBClosed - } - - pk, _ := x.Parse(key) +func (ml *MemoryLayer) readFromCache(key []byte, readTs uint64) *List { + cacheItem, ok := ml.cache.get(key) - if ShouldGoInCache(pk) { - globalCache.Lock() - cacheItem, ok := globalCache.items[string(key)] - if !ok { - cacheItem = NewCachePL() - globalCache.items[string(key)] = cacheItem - } + if ok { cacheItem.count += 1 - - // We use badger subscription to invalidate the cache. For every write we make the value - // corresponding to the key in the cache to nil. So, if we get some non-nil value from the cache - // then it means that no writes have happened after the last set of this key in the cache. - if ok { - if cacheItem.list != nil && cacheItem.list.minTs <= readTs { - cacheItem.list.RLock() - lCopy := copyList(cacheItem.list) - cacheItem.list.RUnlock() - globalCache.Unlock() - return lCopy, nil - } + cacheItem.lastRead = time.Now() + if cacheItem.list != nil && cacheItem.list.minTs <= readTs { + cacheItem.list.RLock() + lCopy := copyList(cacheItem.list) + cacheItem.list.RUnlock() + checkForRollup(key, lCopy) + return lCopy } - globalCache.Unlock() } + return nil +} +func (ml *MemoryLayer) readFromDisk(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + ml.numDisksRead += 1 txn := pstore.NewTransactionAt(readTs, false) defer txn.Discard() @@ -614,30 +671,60 @@ func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { if err != nil { return l, err } + return l, nil +} - // Only set l to the cache if readTs >= latestTs, which implies that l is - // the latest version of the PL. We also check that we're reading a version - // from Badger, which is higher than the write registered by the cache. - if ShouldGoInCache(pk) { - globalCache.Lock() - l.RLock() - cacheItem, ok := globalCache.items[string(key)] - if !ok { - cacheItemNew := NewCachePL() - cacheItemNew.count = 1 - cacheItemNew.list = copyList(l) - cacheItemNew.lastUpdate = l.maxTs - globalCache.items[string(key)] = cacheItemNew - } else { - cacheItem.Set(copyList(l), readTs) - } - l.RUnlock() - globalCache.Unlock() +// Saves the data in the cache. The caller must ensure that the list provided is the latest possible. +func (ml *MemoryLayer) saveInCache(key []byte, l *List) { + l.RLock() + defer l.RUnlock() + cacheItem := NewCachePL() + cacheItem.count = 1 + cacheItem.list = copyList(l) + cacheItem.lastUpdate = l.maxTs + ml.cache.set(key, cacheItem) +} + +func (ml *MemoryLayer) ReadData(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + // We first try to read the data from cache, if it is present. If it's not present, then we would read the + // latest data from the disk. This would get stored in the cache. If this read has a minTs > readTs then + // we would have to read the correct timestamp from the disk. + l := ml.readFromCache(key, readTs) + if l != nil { + l.mutationMap.setTs(readTs) + return l, nil + } + l, err := ml.readFromDisk(key, pstore, math.MaxUint64) + if err != nil { + return nil, err + } + ml.saveInCache(key, l) + if l.minTs == 0 || readTs >= l.minTs { + l.mutationMap.setTs(readTs) + return l, nil + } + + l, err = ml.readFromDisk(key, pstore, readTs) + if err != nil { + return nil, err } - if PostingListCacheEnabled() { - lCache.Set(key, l, 0) + l.mutationMap.setTs(readTs) + return l, nil +} + +func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + return getNew(key, pstore, readTs) +} + +func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { + if pstore.IsClosed() { + return nil, badger.ErrDBClosed } + l, err := memoryLayer.ReadData(key, pstore, readTs) + if err != nil { + return l, err + } return l, nil } diff --git a/posting/mvcc_test.go b/posting/mvcc_test.go index 88c227a9802..8b444bc89b4 100644 --- a/posting/mvcc_test.go +++ b/posting/mvcc_test.go @@ -19,14 +19,18 @@ package posting import ( "context" "math" + "math/rand" + "os" "testing" "time" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/ristretto/v2/z" "github.com/hypermodeinc/dgraph/v24/protos/pb" + "github.com/hypermodeinc/dgraph/v24/schema" "github.com/hypermodeinc/dgraph/v24/x" ) @@ -90,12 +94,64 @@ func TestCacheAfterDeltaUpdateRecieved(t *testing.T) { // Read key at timestamp 10. Make sure cache is not updated by this, as there is a later read. l, err := GetNoStore(key, 10) require.NoError(t, err) - require.Equal(t, l.mutationMap.len(), 0) + require.Equal(t, l.mutationMap.listLen(10), 0) // Read at 20 should show the value l1, err := GetNoStore(key, 20) require.NoError(t, err) - require.Equal(t, l1.mutationMap.len(), 1) + require.Equal(t, l1.mutationMap.listLen(20), 1) +} + +func BenchmarkTestCache(b *testing.B) { + dir, err := os.MkdirTemp("", "storetest_") + x.Panic(err) + defer os.RemoveAll(dir) + + ps, err = badger.OpenManaged(badger.DefaultOptions(dir)) + x.Panic(err) + Init(ps, 10000000, true) + schema.Init(ps) + + attr := x.GalaxyAttr("cache") + keys := make([][]byte, 0) + N := uint64(10000) + NInt := 10000 + txn := Oracle().RegisterStartTs(1) + + for i := uint64(1); i < N; i++ { + key := x.DataKey(attr, i) + keys = append(keys, key) + edge := &pb.DirectedEdge{ + ValueId: 2, + Attr: attr, + Entity: 1, + Op: pb.DirectedEdge_SET, + } + l, _ := GetNoStore(key, 1) + // No index entries added here as we do not call AddMutationWithIndex. + txn.cache.SetIfAbsent(string(l.key), l) + err := l.addMutation(context.Background(), txn, edge) + if err != nil { + panic(err) + } + } + txn.Update() + writer := NewTxnWriter(pstore) + err = txn.CommitToDisk(writer, 2) + if err != nil { + panic(err) + } + writer.Flush() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + key := keys[rand.Intn(NInt-1)] + _, err = getNew(key, pstore, math.MaxUint64) + if err != nil { + panic(err) + } + } + }) } func TestRollupTimestamp(t *testing.T) { @@ -120,6 +176,7 @@ func TestRollupTimestamp(t *testing.T) { Value: []byte(x.Star), Op: pb.DirectedEdge_DEL, } + l.mutationMap.setTs(9) addMutation(t, l, edge, Del, 9, 10, false) nl, err := getNew(key, pstore, math.MaxUint64) @@ -154,6 +211,8 @@ func TestPostingListRead(t *testing.T) { writer := NewTxnWriter(pstore) require.NoError(t, writer.SetAt(key, []byte{}, BitEmptyPosting, 6)) require.NoError(t, writer.Flush()) + // Delete the key from cache as we have just updated it + memoryLayer.del(key) assertLength(7, 0) addEdgeToUID(t, attr, 1, 4, 7, 8) @@ -166,6 +225,7 @@ func TestPostingListRead(t *testing.T) { writer = NewTxnWriter(pstore) require.NoError(t, writer.SetAt(key, data, BitCompletePosting, 10)) require.NoError(t, writer.Flush()) + memoryLayer.del(key) assertLength(10, 0) addEdgeToUID(t, attr, 1, 5, 11, 12) diff --git a/posting/oracle.go b/posting/oracle.go index f84ff66d977..a47d1e52a5e 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -105,8 +105,9 @@ func (vt *viTxn) GetWithLockHeld(key []byte) (rval index.Value, rerr error) { func (vt *viTxn) GetValueFromPostingList(pl *List) (rval index.Value, rerr error) { value := pl.findStaticValue(vt.delegate.StartTs) - if value == nil { - //fmt.Println("DIFF", val, err, nil, badger.ErrKeyNotFound) + // When the posting is deleted, we find the key in the badger, but no postings in it. This should also + // return ErrKeyNotFound as that is what we except in the later functions. + if value == nil || len(value.Postings) == 0 { return nil, ErrNoValue } diff --git a/systest/21million/bulk/run_test.go b/systest/21million/bulk/run_test.go index f6e95173d88..0fd2a5dd568 100644 --- a/systest/21million/bulk/run_test.go +++ b/systest/21million/bulk/run_test.go @@ -19,10 +19,17 @@ package bulk import ( + "context" + "fmt" + "io" "os" "path/filepath" + "runtime" + "strings" "testing" + "time" + "github.com/hypermodeinc/dgraph/v24/chunker" "github.com/hypermodeinc/dgraph/v24/systest/21million/common" "github.com/hypermodeinc/dgraph/v24/testutil" ) @@ -31,6 +38,47 @@ func TestQueries(t *testing.T) { t.Run("Run queries", common.TestQueriesFor21Million) } +func BenchmarkQueries(b *testing.B) { + _, thisFile, _, _ := runtime.Caller(0) + queryDir := filepath.Join(filepath.Dir(thisFile), "../queries") + + // For this test we DON'T want to start with an empty database. + dg, err := testutil.DgraphClient(testutil.ContainerAddr("alpha1", 9080)) + if err != nil { + panic(fmt.Sprintf("Error while getting a dgraph client: %v", err)) + } + + files, err := os.ReadDir(queryDir) + if err != nil { + panic(fmt.Sprintf("Error reading directory: %s", err.Error())) + } + + for _, file := range files { + if !strings.HasPrefix(file.Name(), "query-") { + continue + } + b.Run(file.Name(), func(b *testing.B) { + filename := filepath.Join(queryDir, file.Name()) + reader, cleanup := chunker.FileReader(filename, nil) + bytes, _ := io.ReadAll(reader) + contents := string(bytes[:]) + cleanup() + + // The test query and expected result are separated by a delimiter. + bodies := strings.SplitN(contents, "\n---\n", 2) + // Dgraph can get into unhealthy state sometime. So, add retry for every query. + // If a query takes too long to run, it probably means dgraph is stuck and there's + // no point in waiting longer or trying more tests. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + _, err := dg.NewTxn().Query(ctx, bodies[0]) + if err != nil { + panic(err) + } + cancel() + }) + } +} + func TestMain(m *testing.M) { schemaFile := filepath.Join(testutil.TestDataDirectory, "21million.schema") rdfFile := filepath.Join(testutil.TestDataDirectory, "21million.rdf.gz") @@ -50,7 +98,6 @@ func TestMain(m *testing.M) { if err := testutil.StartAlphas("./alpha.yml"); err != nil { cleanupAndExit(1) } - exitCode := m.Run() cleanupAndExit(exitCode) } diff --git a/worker/config.go b/worker/config.go index 030bcaf2220..05fc6958b0e 100644 --- a/worker/config.go +++ b/worker/config.go @@ -63,6 +63,11 @@ type Options struct { CachePercentage string // CacheMb is the total memory allocated between all the caches. CacheMb int64 + // KeepUpdates is the parameter that allows the user to set if the cache should keep the items that were + // just mutated. Keeping these items are good when there is a mixed workload where you are updating the + // same element multiple times. However, for a heavy mutation workload, not keeping these items would be better + // , as keeping these elements bloats the cache making it slow. + KeepUpdates bool Audit *x.LoggerConf diff --git a/worker/draft.go b/worker/draft.go index d2bd6facc36..4380ad11d79 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -357,9 +357,9 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr return err } - // TODO: Revisit this when we work on posting cache. Clear entire cache. + // TODO: Revisit this when we work on posting cache. Don't clear entire cache. // We don't want to drop entire cache, just due to one namespace. - // posting.ResetCache() + posting.ResetCache() return nil } diff --git a/worker/mutation_unit_test.go b/worker/mutation_unit_test.go index 388d82bddf4..7ed4c3d99d2 100644 --- a/worker/mutation_unit_test.go +++ b/worker/mutation_unit_test.go @@ -42,7 +42,7 @@ func TestReverseEdge(t *testing.T) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("revc: [uid] @reverse @count ."), 1) require.NoError(t, err) diff --git a/worker/online_restore.go b/worker/online_restore.go index 051dc8d3edf..2b891a07e29 100644 --- a/worker/online_restore.go +++ b/worker/online_restore.go @@ -410,6 +410,7 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest, pidx uin return errors.Wrapf(err, "cannot load schema after restore") } + posting.ResetCache() ResetAclCache() // Reset gql schema only when the restore is not partial, so that after this restore diff --git a/worker/sort_test.go b/worker/sort_test.go index 70e0d9fa9b9..0c67f2a0689 100644 --- a/worker/sort_test.go +++ b/worker/sort_test.go @@ -19,6 +19,7 @@ package worker import ( "context" "fmt" + "math/rand" "os" "testing" @@ -52,6 +53,7 @@ func rollup(t *testing.T, key []byte, pstore *badger.DB, readTs uint64) { kvs, err := ol.Rollup(nil, readTs+1) require.NoError(t, err) require.NoError(t, writePostingListToDisk(kvs)) + posting.ResetCache() } func writePostingListToDisk(kvs []*bpb.KV) error { @@ -73,7 +75,7 @@ func TestSingleUid(t *testing.T) { ps, err := badger.OpenManaged(opt) x.Check(err) pstore = ps - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("singleUidTest: string @index(exact) @unique ."), 1) require.NoError(t, err) @@ -168,7 +170,7 @@ func TestLangExact(t *testing.T) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("testLang: string @index(term) @lang ."), 1) require.NoError(t, err) @@ -214,6 +216,16 @@ func TestLangExact(t *testing.T) { require.Equal(t, val.Value, []byte("hindi")) } +const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + +func randStringBytes(n int) string { + b := make([]byte, n) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return string(b) +} + func BenchmarkAddMutationWithIndex(b *testing.B) { dir, err := os.MkdirTemp("", "storetest_") x.Check(err) @@ -224,7 +236,7 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) err = schema.ParseBytes([]byte("benchmarkadd: string @index(term) ."), 1) fmt.Println(err) @@ -235,11 +247,17 @@ func BenchmarkAddMutationWithIndex(b *testing.B) { txn := posting.Oracle().RegisterStartTs(5) attr := x.GalaxyAttr("benchmarkadd") + n := uint64(1000) + values := make([]string, 0) + for range n { + values = append(values, randStringBytes(5)) + } + for i := 0; i < b.N; i++ { edge := &pb.DirectedEdge{ - Value: []byte("david"), + Value: []byte(values[rand.Intn(int(n))]), Attr: attr, - Entity: 1, + Entity: rand.Uint64()%n + 1, Op: pb.DirectedEdge_SET, } diff --git a/worker/worker_test.go b/worker/worker_test.go index 90ae194cacb..3f6ac646ddf 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -56,6 +56,7 @@ func commitTransaction(t *testing.T, edge *pb.DirectedEdge, l *posting.List) { startTs := timestamp() txn := posting.Oracle().RegisterStartTs(startTs) l = txn.Store(l) + l.SetTs(startTs) require.NoError(t, l.AddMutationWithIndex(context.Background(), edge, txn)) commit := commitTs(startTs) @@ -500,7 +501,7 @@ func TestMain(m *testing.M) { x.Check(err) pstore = ps // Not using posting list cache - posting.Init(ps, 0) + posting.Init(ps, 0, false) Init(ps) m.Run() diff --git a/x/keys.go b/x/keys.go index 906416173b2..4740db3118c 100644 --- a/x/keys.go +++ b/x/keys.go @@ -307,7 +307,7 @@ func (p ParsedKey) String() string { } else if p.IsCountOrCountRev() { return fmt.Sprintf("UID: %v, Attr: %v, IsCount/Ref: true, Count: %v", p.Uid, p.Attr, p.Count) } else { - return fmt.Sprintf("UID: %v, Attr: %v, Data key", p.Uid, p.Attr) + return fmt.Sprintf("UID: %v, Attr: %v, Data key, prefix; %v, byte: %v", p.Uid, p.Attr, p.bytePrefix, p.ByteType) } }