Skip to content

Commit

Permalink
fix(storage) Remove the memory leak from watchDispatcher.
Browse files Browse the repository at this point in the history
While watchDispatcher doesn't grow unboundedly anymore, it can't be torn down trivially.
This is fixable, but a fix for that would leak more complexity into the storage layer.
  • Loading branch information
ttimonen authored Jul 24, 2024
1 parent da4bc64 commit 50034ba
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
2 changes: 1 addition & 1 deletion pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type StorageImpl struct {
processor Processor
root string
versioner storage.Versioner
watchDispatcher watchDispatcher
watchDispatcher *watchDispatcher
}

// StorageQuerier wraps the storage.Interface and adds some extra methods which are used by the storage implementation.
Expand Down
54 changes: 37 additions & 17 deletions pkg/registry/file/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,17 +121,16 @@ type watchersList []*watcher

// watchDispatcher dispatches events to registered watches
//
// TODO(vladklokun): Please keep in mind that this dispatcher does not offer any collection of
// resources left over by the stopped watches! There are multiple ways to go about it:
// 1. On-Stop cleanup, where a watcher would notifies the dispatcher about being stopped, and that it can be cleaned up
// 2. Garbage collection. Periodic background cleanup. Poses challenges as this
// might be a contended concurrent resource.
// TODO(ttimonen): There's currently no way to gracefully take down watchDispatcher without leaking a goroutine.
type watchDispatcher struct {
watchesByKey *xsync.MapOf[string, watchersList]
gcCh chan string
}

func newWatchDispatcher() watchDispatcher {
return watchDispatcher{xsync.NewMapOf[watchersList]()}
func newWatchDispatcher() *watchDispatcher {
wd := watchDispatcher{xsync.NewMapOf[watchersList](), make(chan string)}
go wd.gcer()
return &wd
}

func extractKeysToNotify(key string) []string {
Expand All @@ -152,6 +151,30 @@ func (wd *watchDispatcher) Register(key string, w *watcher) {
wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) {
return append(l, w), false
})
go func() {
<-w.ctx.Done()
wd.gcCh <- key
}()
}

func (wd *watchDispatcher) gcer() {
for key := range wd.gcCh { // This is an O(n) op, where n is # of watchers in a particular key.
wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) {
if len(l) == 0 {
return nil, true
}
out := make(watchersList, 0, len(l)-1) // Preallocate with the intent to drop one element
// Doing dropping inplace would be more efficient alloc-wise, but would cause data races on notify
// the way it's currently implemented.
for _, w := range l {
if w.ctx.Err() == nil {
out = append(out, w)
}
}
return out, len(out) == 0
})
// TODO(ttimonen) sleeping a bit here can give a batch cleanup improvements, maybe.
}
}

// Added dispatches an "Added" event to appropriate watchers
Expand All @@ -171,15 +194,12 @@ func (wd *watchDispatcher) Modified(key string, obj runtime.Object) {

// notify notifies the listeners of a given key about an event of a given eventType about a given obj
func (wd *watchDispatcher) notify(key string, eventType watch.EventType, obj runtime.Object) {
// Don’t block callers by publishing in a separate goroutine
// TODO(ttimonen) This is kind of expensive way to manage queue, yet watchers might block each other.
go func() {
event := watch.Event{Type: eventType, Object: obj}
for _, part := range extractKeysToNotify(key) {
ws, _ := wd.watchesByKey.Load(part)
for _, w := range ws {
w.notify(event)
}
// Notify calls do not block normally, unless the client-side is messed up.
event := watch.Event{Type: eventType, Object: obj}
for _, part := range extractKeysToNotify(key) {
ws, _ := wd.watchesByKey.Load(part)
for _, w := range ws {
w.notify(event)
}
}()
}
}

0 comments on commit 50034ba

Please sign in to comment.