Skip to content

Commit

Permalink
[3.2.1 Backport] CBG-4151: Memory-based rev cache size (#7134)
Browse files Browse the repository at this point in the history
* CBG-4023: Removal unmarshalled body on the rev cache (#7030)

* CBG-4135: new stat for rev cache capacity (#7049)

* CBG-4135: new stat for rev cache capacity

* updated based on review

* fix linter

* lint again

* CBG-4032: memory based rev cache implementation (#7040)

* CBG-4032: memory based rev cache implementation

* make test pass with default collection + missing return param

* more issues with default collection test

* touch ups on comment + remove unused function

* updated for review

* further tidy up

* lint fix after refector test

* fixes from rebase

* updates based off review + some more refactoring

* CBG-4134: link rev cache memory limit config option to rev cache (#7084)

* CBG-4134: link rev cache memory limit config option to rev cache

* failing tests

* address commnets

* fix yaml lint

* fix failing test + mistake in docs

Signed-off-by: Gregory Newman-Smith <[email protected]>

---------

Signed-off-by: Gregory Newman-Smith <[email protected]>

* CBG-4234: clean up rev cache work (#7113)

* CBG-4234: clean up rev cache work

* new test

* CBG-4277: Remove unused totalBytesForHistory from getHistory (#7137)

---------

Signed-off-by: Gregory Newman-Smith <[email protected]>
Co-authored-by: Gregory Newman-Smith <[email protected]>
  • Loading branch information
bbrks and gregns1 authored Sep 27, 2024
1 parent 73d6f7e commit f682b48
Show file tree
Hide file tree
Showing 23 changed files with 1,099 additions and 304 deletions.
14 changes: 14 additions & 0 deletions base/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,12 +460,16 @@ type CacheStats struct {
NumSkippedSeqs *SgwIntStat `json:"num_skipped_seqs"`
// The total number of pending sequences. These are out-of-sequence entries waiting to be cached.
PendingSeqLen *SgwIntStat `json:"pending_seq_len"`
// Total number of items in the rev cache
RevisionCacheNumItems *SgwIntStat `json:"revision_cache_num_items"`
// The total number of revision cache bypass operations performed.
RevisionCacheBypass *SgwIntStat `json:"rev_cache_bypass"`
// The total number of revision cache hits.
RevisionCacheHits *SgwIntStat `json:"rev_cache_hits"`
// The total number of revision cache misses.
RevisionCacheMisses *SgwIntStat `json:"rev_cache_misses"`
// Total memory used by the rev cache
RevisionCacheTotalMemory *SgwIntStat `json:"revision_cache_total_memory"`
// The current length of the pending skipped sequence slice.
SkippedSeqLen *SgwIntStat `json:"skipped_seq_len"`
// The current capacity of the skipped sequence slice
Expand Down Expand Up @@ -1355,6 +1359,10 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.RevisionCacheNumItems, err = NewIntStat(SubsystemCacheKey, "revision_cache_num_items", StatUnitNoUnits, RevCacheNumItemsDesc, StatAddedVersion3dot2dot1, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.RevisionCacheBypass, err = NewIntStat(SubsystemCacheKey, "rev_cache_bypass", StatUnitNoUnits, RevCacheBypassDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
Expand All @@ -1367,6 +1375,10 @@ func (d *DbStats) initCacheStats() error {
if err != nil {
return err
}
resUtil.RevisionCacheTotalMemory, err = NewIntStat(SubsystemCacheKey, "revision_cache_total_memory", StatUnitNoUnits, RevCacheMemoryDesc, StatAddedVersion3dot2dot1, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
}
resUtil.SkippedSeqLen, err = NewIntStat(SubsystemCacheKey, "skipped_seq_len", StatUnitNoUnits, SkippedSeqLengthDesc, StatAddedVersion3dot0dot0, StatDeprecatedVersionNotDeprecated, StatStabilityCommitted, labelKeys, labelVals, prometheus.GaugeValue, 0)
if err != nil {
return err
Expand Down Expand Up @@ -1412,9 +1424,11 @@ func (d *DbStats) unregisterCacheStats() {
prometheus.Unregister(d.CacheStats.SkippedSeqCap)
prometheus.Unregister(d.CacheStats.NumCurrentSeqsSkipped)
prometheus.Unregister(d.CacheStats.PendingSeqLen)
prometheus.Unregister(d.CacheStats.RevisionCacheNumItems)
prometheus.Unregister(d.CacheStats.RevisionCacheBypass)
prometheus.Unregister(d.CacheStats.RevisionCacheHits)
prometheus.Unregister(d.CacheStats.RevisionCacheMisses)
prometheus.Unregister(d.CacheStats.RevisionCacheTotalMemory)
prometheus.Unregister(d.CacheStats.SkippedSeqLen)
prometheus.Unregister(d.CacheStats.ViewQueries)
}
Expand Down
4 changes: 4 additions & 0 deletions base/stats_descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ const (

PendingSeqLengthDesc = "The total number of pending sequences. These are out-of-sequence entries waiting to be cached."

RevCacheNumItemsDesc = "The total number of items in the revision cache."

RevCacheBypassDesc = "The total number of revision cache bypass operations performed."

RevCacheHitsDesc = "The total number of revision cache hits. This metric can be used to calculate the ratio of revision cache hits: " +
Expand All @@ -134,6 +136,8 @@ const (
RevCacheMissesDesc = "The total number of revision cache misses. This metric can be used to calculate the ratio of revision cache misses: " +
"Rev Cache Miss Ratio = rev_cache_misses / (rev_cache_hits + rev_cache_misses)"

RevCacheMemoryDesc = "The approximation of total memory taken up by rev cache for documents. This is measured by the raw document body, the channels allocated to a document and its revision history."

SkippedSeqLengthDesc = "The current length of the pending skipped sequence slice."

SkippedSeqCapDesc = "The current capacity of the skipped sequence slice."
Expand Down
4 changes: 2 additions & 2 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (db *DatabaseCollectionWithUser) addDocToChangeEntry(ctx context.Context, e
}

func (db *DatabaseCollectionWithUser) AddDocToChangeEntryUsingRevCache(ctx context.Context, entry *ChangeEntry, revID string) (err error) {
rev, err := db.getRev(ctx, entry.ID, revID, 0, nil, RevCacheIncludeBody)
rev, err := db.getRev(ctx, entry.ID, revID, 0, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func (db *DatabaseCollectionWithUser) buildRevokedFeed(ctx context.Context, ch c

// UserHasDocAccess checks whether the user has access to the active revision of the document
func UserHasDocAccess(ctx context.Context, collection *DatabaseCollectionWithUser, docID string) (bool, error) {
currentRev, err := collection.revisionCache.GetActive(ctx, docID, false)
currentRev, err := collection.revisionCache.GetActive(ctx, docID)
if err != nil {
if base.IsDocNotFoundError(err) {
return false, nil
Expand Down
51 changes: 24 additions & 27 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (db *DatabaseCollectionWithUser) GetRev(ctx context.Context, docID, revID s
if history {
maxHistory = math.MaxInt32
}
return db.getRev(ctx, docID, revID, maxHistory, nil, RevCacheOmitBody)
return db.getRev(ctx, docID, revID, maxHistory, nil)
}

// Returns the body of the current revision of a document
Expand All @@ -278,7 +278,7 @@ func (db *DatabaseCollectionWithUser) Get1xRevBody(ctx context.Context, docid, r

// Retrieves rev with request history specified as collection of revids (historyFrom)
func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string, attachmentsSince []string, showExp bool) (Body, error) {
rev, err := db.getRev(ctx, docid, revid, maxHistory, historyFrom, RevCacheIncludeBody)
rev, err := db.getRev(ctx, docid, revid, maxHistory, historyFrom)
if err != nil {
return nil, err
}
Expand All @@ -305,14 +305,14 @@ func (db *DatabaseCollectionWithUser) Get1xRevBodyWithHistory(ctx context.Contex
// - attachmentsSince is nil to return no attachment bodies, otherwise a (possibly empty) list of
// revisions for which the client already has attachments and doesn't need bodies. Any attachment
// that hasn't changed since one of those revisions will be returned as a stub.
func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string, includeBody bool) (revision DocumentRevision, err error) {
func (db *DatabaseCollectionWithUser) getRev(ctx context.Context, docid, revid string, maxHistory int, historyFrom []string) (revision DocumentRevision, err error) {
if revid != "" {
// Get a specific revision body and history from the revision cache
// (which will load them if necessary, by calling revCacheLoader, above)
revision, err = db.revisionCache.Get(ctx, docid, revid, includeBody, RevCacheOmitDelta)
revision, err = db.revisionCache.Get(ctx, docid, revid, RevCacheOmitDelta)
} else {
// No rev ID given, so load active revision
revision, err = db.revisionCache.GetActive(ctx, docid, includeBody)
revision, err = db.revisionCache.GetActive(ctx, docid)
}

if err != nil {
Expand Down Expand Up @@ -373,7 +373,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR
return nil, nil, nil
}

fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheOmitBody, RevCacheIncludeDelta)
fromRevision, err := db.revisionCache.Get(ctx, docID, fromRevID, RevCacheIncludeDelta)

// If the fromRevision is a removal cache entry (no body), but the user has access to that removal, then just
// return 404 missing to indicate that the body of the revision is no longer available.
Expand Down Expand Up @@ -413,7 +413,7 @@ func (db *DatabaseCollectionWithUser) GetDelta(ctx context.Context, docID, fromR

// db.DbStats.StatsDeltaSync().Add(base.StatKeyDeltaCacheMisses, 1)
db.dbStats().DeltaSync().DeltaCacheMiss.Add(1)
toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheOmitBody, RevCacheIncludeDelta)
toRevision, err := db.revisionCache.Get(ctx, docID, toRevID, RevCacheIncludeDelta)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -549,38 +549,36 @@ func (col *DatabaseCollectionWithUser) authorizeDoc(doc *Document, revid string)

// Gets a revision of a document. If it's obsolete it will be loaded from the database if possible.
// inline "_attachments" properties in the body will be extracted and returned separately if present (pre-2.5 metadata, or backup revisions)
func (c *DatabaseCollection) getRevision(ctx context.Context, doc *Document, revid string) (bodyBytes []byte, body Body, attachments AttachmentsMeta, err error) {
func (c *DatabaseCollection) getRevision(ctx context.Context, doc *Document, revid string) (bodyBytes []byte, attachments AttachmentsMeta, err error) {
bodyBytes = doc.getRevisionBodyJSON(ctx, revid, c.RevisionBodyLoader)

// No inline body, so look for separate doc:
if bodyBytes == nil {
if !doc.History.contains(revid) {
return nil, nil, nil, ErrMissing
return nil, nil, ErrMissing
}

bodyBytes, err = c.getOldRevisionJSON(ctx, doc.ID, revid)
if err != nil || bodyBytes == nil {
return nil, nil, nil, err
return nil, nil, err
}
}

// optimistically grab the doc body and to store as a pre-unmarshalled version, as well as anticipating no inline attachments.
if doc.CurrentRev == revid {
body = doc._body
attachments = doc.Attachments
}

// handle backup revision inline attachments, or pre-2.5 meta
if inlineAtts, cleanBodyBytes, cleanBody, err := extractInlineAttachments(bodyBytes); err != nil {
return nil, nil, nil, err
if inlineAtts, cleanBodyBytes, _, err := extractInlineAttachments(bodyBytes); err != nil {
return nil, nil, err
} else if len(inlineAtts) > 0 {
// we found some inline attachments, so merge them with attachments, and update the bodies
attachments = mergeAttachments(inlineAtts, attachments)
bodyBytes = cleanBodyBytes
body = cleanBody
}

return bodyBytes, body, attachments, nil
return bodyBytes, attachments, nil
}

// mergeAttachments copies the docAttachments map, and merges pre25Attachments into it.
Expand Down Expand Up @@ -705,7 +703,7 @@ func (db *DatabaseCollectionWithUser) get1xRevFromDoc(ctx context.Context, doc *
return nil, false, ErrDeleted
}
}
if bodyBytes, _, attachments, err = db.getRevision(ctx, doc, revid); err != nil {
if bodyBytes, attachments, err = db.getRevision(ctx, doc, revid); err != nil {
return nil, false, err
}
}
Expand Down Expand Up @@ -742,7 +740,7 @@ func (db *DatabaseCollectionWithUser) get1xRevFromDoc(ctx context.Context, doc *
// Returns the body and rev ID of the asked-for revision or the most recent available ancestor.
func (db *DatabaseCollectionWithUser) getAvailableRev(ctx context.Context, doc *Document, revid string) ([]byte, string, AttachmentsMeta, error) {
for ; revid != ""; revid = doc.History[revid].Parent {
if bodyBytes, _, attachments, _ := db.getRevision(ctx, doc, revid); bodyBytes != nil {
if bodyBytes, attachments, _ := db.getRevision(ctx, doc, revid); bodyBytes != nil {
return bodyBytes, revid, attachments, nil
}
}
Expand Down Expand Up @@ -2155,15 +2153,14 @@ func (db *DatabaseCollectionWithUser) updateAndReturnDoc(ctx context.Context, do

revChannels := doc.History[newRevID].Channels
documentRevision := DocumentRevision{
DocID: docid,
RevID: newRevID,
BodyBytes: storedDocBytes,
History: encodeRevisions(ctx, docid, history),
Channels: revChannels,
Attachments: doc.Attachments,
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
_shallowCopyBody: storedDoc.Body(ctx),
DocID: docid,
RevID: newRevID,
BodyBytes: storedDocBytes,
History: encodeRevisions(ctx, docid, history),
Channels: revChannels,
Attachments: doc.Attachments,
Expiry: doc.Expiry,
Deleted: doc.History[newRevID].Deleted,
}

if createNewRevIDSkipped {
Expand Down Expand Up @@ -2261,7 +2258,7 @@ func getAttachmentIDsForLeafRevisions(ctx context.Context, db *DatabaseCollectio
})

for _, leafRevision := range documentLeafRevisions {
_, _, attachmentMeta, err := db.getRevision(ctx, doc, leafRevision)
_, attachmentMeta, err := db.getRevision(ctx, doc, leafRevision)
if err != nil {
return nil, err
}
Expand Down
88 changes: 81 additions & 7 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,13 @@ func TestGetRemovedAsUser(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -514,9 +519,11 @@ func TestGetRemovalMultiChannel(t *testing.T) {
_, rev1Digest := ParseRevID(ctx, rev1ID)
_, rev2Digest := ParseRevID(ctx, rev2ID)

var interfaceListChannels []interface{}
interfaceListChannels = append(interfaceListChannels, "ABC")
bodyExpected := Body{
"k2": "v2",
"channels": []string{"ABC"},
"channels": interfaceListChannels,
BodyRevisions: Revisions{
RevisionsStart: 2,
RevisionsIds: []string{rev2Digest, rev1Digest},
Expand Down Expand Up @@ -752,8 +759,13 @@ func TestGetRemoved(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStat)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -821,8 +833,13 @@ func TestGetRemovedAndDeleted(t *testing.T) {
// Manually remove the temporary backup doc from the bucket
// Manually flush the rev cache
// After expiry from the rev cache and removal of doc backup, try again
cacheHitCounter, cacheMissCounter := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(DefaultRevisionCacheShardCount, DefaultRevisionCacheSize, backingStoreMap, cacheHitCounter, cacheMissCounter)
cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats := db.DatabaseContext.DbStats.Cache().RevisionCacheHits, db.DatabaseContext.DbStats.Cache().RevisionCacheMisses, db.DatabaseContext.DbStats.Cache().RevisionCacheNumItems, db.DatabaseContext.DbStats.Cache().RevisionCacheTotalMemory
cacheOptions := &RevisionCacheOptions{
MaxBytes: 0,
MaxItemCount: DefaultRevisionCacheSize,
ShardCount: DefaultRevisionCacheShardCount,
}
collection.dbCtx.revisionCache = NewShardedLRURevisionCache(cacheOptions, backingStoreMap, cacheHitCounter, cacheMissCounter, cacheNumItems, memoryCacheStats)
err = collection.PurgeOldRevisionJSON(ctx, "doc1", rev2id)
assert.NoError(t, err, "Purge old revision JSON")

Expand Down Expand Up @@ -3321,3 +3338,60 @@ func TestBadDCPStart(t *testing.T) {

dbCtx.Close(ctx)
}

func TestInject1xBodyProperties(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

collection, ctx := GetSingleDatabaseCollectionWithUser(ctx, t, db)

rev1ID, _, err := collection.Put(ctx, "doc", Body{"test": "doc"})
require.NoError(t, err)
var rev2Body Body
rev2Data := `{"key":"value", "_attachments": {"hello.txt": {"data":"aGVsbG8gd29ybGQ="}}}`
require.NoError(t, base.JSONUnmarshal([]byte(rev2Data), &rev2Body))
_, rev2ID, err := collection.PutExistingRevWithBody(ctx, "doc", rev2Body, []string{"2-abc", rev1ID}, true)
require.NoError(t, err)

docRev, err := collection.GetRev(ctx, "doc", rev2ID, true, nil)
require.NoError(t, err)

// mock expiry on doc
exp := time.Now()
docRev.Expiry = &exp

newDoc, err := docRev.Inject1xBodyProperties(ctx, collection, docRev.History, nil, true)
require.NoError(t, err)
var resBody Body
require.NoError(t, resBody.Unmarshal(newDoc))

// cast to map of interface given we have injected the properties runtime has no concept of the AttachmentMeta and Revisions types
revs := resBody[BodyRevisions].(map[string]interface{})
atts := resBody[BodyAttachments].(map[string]interface{})

assert.NotNil(t, atts)
assert.NotNil(t, revs)
assert.Equal(t, "doc", resBody[BodyId])
assert.Equal(t, "2-abc", resBody[BodyRev])
assert.Equal(t, exp.Format(time.RFC3339), resBody[BodyExpiry])
assert.Equal(t, "value", resBody["key"])

// mock doc deleted
docRev.Deleted = true

newDoc, err = docRev.Inject1xBodyProperties(ctx, collection, docRev.History, []string{"2-abc"}, true)
require.NoError(t, err)
require.NoError(t, resBody.Unmarshal(newDoc))

// cast to map of interface given we have injected the properties runtime has no concept of the AttachmentMeta and Revisions types
revs = resBody[BodyRevisions].(map[string]interface{})
atts = resBody[BodyAttachments].(map[string]interface{})

assert.NotNil(t, atts)
assert.NotNil(t, revs)
assert.Equal(t, "doc", resBody[BodyId])
assert.Equal(t, "2-abc", resBody[BodyRev])
assert.Equal(t, exp.Format(time.RFC3339), resBody[BodyExpiry])
assert.Equal(t, "value", resBody["key"])
assert.True(t, resBody[BodyDeleted].(bool))
}
Loading

0 comments on commit f682b48

Please sign in to comment.