Skip to content

Commit

Permalink
Fix issue with manipulating the tree in Ascend
Browse files Browse the repository at this point in the history
Co-authored-by: Stefan Bueringer <[email protected]>
  • Loading branch information
alvaroaleman and sbueringer committed Jan 7, 2025
1 parent 6e71ef7 commit 71f0c98
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
9 changes: 8 additions & 1 deletion pkg/controller/priorityqueue/priorityqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (w *priorityqueue[T]) spin() {
w.lockedLock.Lock()
defer w.lockedLock.Unlock()

// manipulating the tree from within Ascend might lead to panics, so
// track what we want to delete and do it after we are done ascending.
var toDelete []*item[T]
w.queue.Ascend(func(item *item[T]) bool {
if item.readyAt != nil {
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
Expand Down Expand Up @@ -230,12 +233,16 @@ func (w *priorityqueue[T]) spin() {
w.locked.Insert(item.key)
w.waiters.Add(-1)
delete(w.items, item.key)
w.queue.Delete(item)
toDelete = append(toDelete, item)
w.becameReady.Delete(item.key)
w.get <- *item

return true
})

for _, item := range toDelete {
w.queue.Delete(item)
}
}()
}
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/controller/priorityqueue/priorityqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package priorityqueue

import (
"fmt"
"math/rand/v2"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -344,6 +345,40 @@ var _ = Describe("Controllerworkqueue", func() {
Expect(metrics.depth["test"]).To(Equal(4))
metrics.mu.Unlock()
})

It("returns many items", func() {
// This test ensures the queue is able to drain a large queue without panic'ing.
// In a previous version of the code we were calling queue.Delete within q.Ascend
// which led to a panic in queue.Ascend > iterate:
// "panic: runtime error: index out of range [0] with length 0"
q, _ := newQueue()
defer q.ShutDown()

for range 20 {
for i := range 1000 {
rn := rand.N(100) //nolint:gosec // We don't need cryptographically secure entropy here
if rn < 10 {
q.AddWithOpts(AddOpts{After: time.Duration(rn) * time.Millisecond}, fmt.Sprintf("foo%d", i))
} else {
q.AddWithOpts(AddOpts{Priority: rn}, fmt.Sprintf("foo%d", i))
}
}

wg := sync.WaitGroup{}
for range 100 { // The panic only occurred relatively frequently with a high number of go routines.
wg.Add(1)
go func() {
defer wg.Done()
for range 10 {
obj, _, _ := q.GetWithPriority()
q.Done(obj)
}
}()
}

wg.Wait()
}
})
})

func BenchmarkAddGetDone(b *testing.B) {
Expand Down

0 comments on commit 71f0c98

Please sign in to comment.