Skip to content

Commit

Permalink
[FIXED] ss.Last was not kept up-to-date
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored and wallyqs committed Jan 8, 2025
1 parent 45b4857 commit 240fce9
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 71 deletions.
145 changes: 100 additions & 45 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2315,8 +2315,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
Expand Down Expand Up @@ -2445,8 +2445,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if sseq <= ss.First {
update(ss)
Expand Down Expand Up @@ -2745,8 +2745,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
Expand Down Expand Up @@ -2936,8 +2936,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3224,8 +3224,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
Expand Down Expand Up @@ -3898,8 +3898,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
Expand Down Expand Up @@ -4030,8 +4030,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
if ss == nil {
Expand Down Expand Up @@ -7832,60 +7832,115 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {

ss.Msgs--

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

// Mark first as updated.
ss.firstNeedsUpdate = false

startSlot := int(startSeq - mb.cache.fseq)
startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
} else if startSlot < 0 {
startSlot = 0
}

fseq := startSeq + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
}

var le = binary.LittleEndian
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false

fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false

lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
ss.First = seq
if ss.Msgs == 1 {
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5030,7 +5030,7 @@ func TestFileStoreRecaluclateFirstForSubjBug(t *testing.T) {
mb.clearCacheAndOffset()
// Now call with start sequence of 1, the old one
// This will panic without the fix.
mb.recalculateFirstForSubj("foo", 1, ss)
mb.recalculateForSubj("foo", ss)
// Make sure it was update properly.
require_True(t, *ss == SimpleState{Msgs: 1, First: 3, Last: 3, firstNeedsUpdate: false})
}
Expand Down
72 changes: 47 additions & 25 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
Expand Down Expand Up @@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
Expand Down Expand Up @@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
Expand Down Expand Up @@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if !ms.removeMsg(ss.First, false) {
break
Expand Down Expand Up @@ -1267,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if ss.First < fseq {
fseq = ss.First
Expand Down Expand Up @@ -1362,25 +1362,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--

// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}

// Will recalculate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
if ss.Msgs == 1 {
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
ss.firstNeedsUpdate = false
return
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ type SimpleState struct {

// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
// Internal usage for when the last needs to be updated before use.
lastNeedsUpdate bool
}

// LostStreamData indicates msgs that have been lost.
Expand Down

0 comments on commit 240fce9

Please sign in to comment.