Skip to content

Commit

Permalink
CBG-4221 [3.1.11 backport] Don't increment high sequence cached for u…
Browse files Browse the repository at this point in the history
…nused sequences on pendingLogs (#7106)

CBG-4221 [3.1.11 backport] Don't increment high sequence cached for unused sequences on pendingLogs

If unused sequences ranges are pushed to pendingLogs, they shouldn't increment the channel cache's highSequenceCached until they are processed.
  • Loading branch information
adamcfraser authored Sep 4, 2024
1 parent b6127c1 commit 5ba7cd0
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 10 deletions.
8 changes: 2 additions & 6 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
} else {
changedChannels.Add(unusedSeq)
}
c.channelCache.AddUnusedSequence(change)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
}
Expand All @@ -598,7 +597,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
}
changedChannels := c.processEntry(ctx, change)
allChangedChannels = allChangedChannels.Update(changedChannels)
c.channelCache.AddUnusedSequence(change)
if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
}
Expand All @@ -608,9 +606,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
// push unused range to either pending or skipped lists based on current state of the change cache
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)

// update high seq cached
c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence})

if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
}
Expand Down Expand Up @@ -803,8 +798,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann
}
delete(c.receivedSeqs, change.Sequence)

// If unused sequence or principal, we're done after updating sequence
// If unused sequence, notify the cache and return
if change.DocID == "" {
c.channelCache.AddUnusedSequence(change)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2538,7 +2538,7 @@ func TestReleasedSequenceRangeHandlingEverythingPending(t *testing.T) {
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, uint64(2), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
assert.Equal(c, int64(25), dbContext.DbStats.CacheStats.HighSeqCached.Value())
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)
}

Expand Down Expand Up @@ -2644,7 +2644,7 @@ func TestReleasedSequenceRangeHandlingEverythingPendingLowPendingCapacity(t *tes
defer testChangeCache.Stop(ctx)
require.NoError(t, err)

// process unused sequence range
// process unused sequence range, will be sent to pending. Triggers seq 1 being sent to skipped
testChangeCache.releaseUnusedSequenceRange(ctx, 2, 25, time.Now())

require.EventuallyWithT(t, func(c *assert.CollectT) {
Expand Down Expand Up @@ -2755,7 +2755,7 @@ func TestReleasedSequenceRangeHandlingSingleSequence(t *testing.T) {
assert.Equal(c, int64(1), dbContext.DbStats.CacheStats.PendingSeqLen.Value())
assert.Equal(c, uint64(1), testChangeCache.nextSequence)
dbContext.UpdateCalculatedStats(ctx)
assert.Equal(c, int64(2), dbContext.DbStats.CacheStats.HighSeqCached.Value())
assert.Equal(c, int64(0), dbContext.DbStats.CacheStats.HighSeqCached.Value())
}, time.Second*10, time.Millisecond*100)

// process change that should overload pending and push sequence 1 to skipped
Expand Down
6 changes: 5 additions & 1 deletion db/channel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func (c *channelCacheImpl) AddPrincipal(change *LogEntry) {

// Add unused Sequence notifies the cache of an unused sequence update. Updates the cache's high sequence
func (c *channelCacheImpl) AddUnusedSequence(change *LogEntry) {
c.updateHighCacheSequence(change.Sequence)
if change.EndSequence > 0 {
c.updateHighCacheSequence(change.EndSequence)
} else {
c.updateHighCacheSequence(change.Sequence)
}
}

// Adds an entry to the appropriate channels' caches, returning the affected channels. lateSequence
Expand Down

0 comments on commit 5ba7cd0

Please sign in to comment.