From bc54a6afef24843d9f7664250c7ed0f3e5c44392 Mon Sep 17 00:00:00 2001 From: ttimonen Date: Mon, 22 Jul 2024 09:46:33 +0000 Subject: [PATCH 1/3] refactor(watcher_test) Simplify the watcher testcase handling This is a prequel refactoring that helps to lure out the deadlocks in the watcher later. Signed-off-by: ttimonen --- pkg/registry/file/watch_test.go | 326 +++++++++----------------------- 1 file changed, 91 insertions(+), 235 deletions(-) diff --git a/pkg/registry/file/watch_test.go b/pkg/registry/file/watch_test.go index 760c42e99..0ddaacc7b 100644 --- a/pkg/registry/file/watch_test.go +++ b/pkg/registry/file/watch_test.go @@ -93,128 +93,64 @@ func TestFileSystemStorageWatchReturnsDistinctWatchers(t *testing.T) { } } -func TestFilesystemStoragePublishesToMatchingWatch(t *testing.T) { +func TestFilesystemStorageWatchPublishing(t *testing.T) { + var ( + keyN = "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape" + keyK = "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape" + obj = &v1beta1.SBOMSPDXv2p3{ObjectMeta: v1.ObjectMeta{ + Name: "some-sbom", + ResourceVersion: "1", + }} + ) tt := []struct { - name string - inputWatchesByKey map[string]int - inputObjects map[string]*v1beta1.SBOMSPDXv2p3 - expectedEvents map[string][]watch.Event - }{ - { - name: "Create should publish to the appropriate single channel", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": 1, - }, - inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape/some-sbom": { - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - }, - }, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": { - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - }, - }, + name string + start, stopBefore, stopAfter map[string]int + inputObjects map[string]*v1beta1.SBOMSPDXv2p3 + want map[string][]watch.Event + }{{ + name: "Create should publish to the appropriate single channel", + start: map[string]int{keyK: 1}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyK + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, }, - { - name: "Create should publish to all watchers on the relevant key", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": 3, - }, - inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape/some-sbom": { - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - }, - }, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": { - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - }, - }, + want: map[string][]watch.Event{keyK: {{Type: watch.Added, Object: obj}}}, + }, { + name: "Create should publish to all watchers on the relevant key", + start: map[string]int{keyK: 3}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyK + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, }, - { - name: "Creating on key different than the watch should produce no event", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": 3, - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": 1, - }, - inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape/some-sbom": { - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - }, - }, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": { - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - }, - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": {}, - }, + want: map[string][]watch.Event{keyK: { + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + }}, + }, { + name: "Creating on key different than the watch should produce no event", + start: map[string]int{keyK: 3, keyN: 1}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, }, - { - name: "Creating on key not being watched should produce no events", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape": 1, - }, - inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape/some-sbom": { - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - }, - }, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": {}, - }, + want: map[string][]watch.Event{keyN: {{Type: watch.Added, Object: obj}}, keyK: {}}, + }, { + name: "Creating on key not being watched should produce no events", + start: map[string]int{keyK: 1}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, }, - } + want: map[string][]watch.Event{keyN: {}}, + }, { + name: "Sending to stopped watch should not produce an event", + start: map[string]int{keyN: 3}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, + }, + stopBefore: map[string]int{keyN: 1}, + want: map[string][]watch.Event{keyN: { + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + }}, + }} for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -222,139 +158,63 @@ func TestFilesystemStoragePublishesToMatchingWatch(t *testing.T) { ctx := context.Background() opts := storage.ListOptions{} - watchSlicesByKey := map[string][]watch.Interface{} - for key, watchCount := range tc.inputWatchesByKey { + // Arrange watches + watchers := map[string][]watch.Interface{} + for key, watchCount := range tc.start { for i := 0; i < watchCount; i++ { - watch, _ := s.Watch(ctx, key, opts) - currentWatchSlice := watchSlicesByKey[key] - currentWatchSlice = append(currentWatchSlice, watch) - watchSlicesByKey[key] = currentWatchSlice + w, _ := s.Watch(ctx, key, opts) + watchers[key] = append(watchers[key], w) } } - var ttl uint64 = 0 - out := &v1beta1.SBOMSPDXv2p3{} - for key, object := range tc.inputObjects { - _ = s.Create(ctx, key, object, out, ttl) - } - - for key, expectedEvents := range tc.expectedEvents { - watches := watchSlicesByKey[key] - - gotEvents := []watch.Event{} - for idx := range watches { + // Primitives to stop the watchers gracefully + var ( + done = make(chan bool, 1) + wait = func() { select { - case gotEvent := <-watches[idx].ResultChan(): - gotEvents = append(gotEvents, gotEvent) + case <-done: case <-time.After(chanWaitTimeout): - // Timed out, no event received - continue + t.Errorf("Timed out trying to stop watches") } } - assert.Equal(t, expectedEvents, gotEvents) - } - - }) - } -} - -func TestFilesystemStorageWatchStop(t *testing.T) { - tt := []struct { - name string - inputWatchesByKey map[string]int - inputStopWatchesAtIndex map[string]int - inputObjects map[string]*v1beta1.SBOMSPDXv2p3 - expectedEvents map[string][]watch.Event - }{ - { - name: "Sending to stopped watch should not produce an event", - inputWatchesByKey: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": 3, - }, - inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape/some-sbom": { - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - }, - }, - }, - inputStopWatchesAtIndex: map[string]int{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": 1, - }, - expectedEvents: map[string][]watch.Event{ - "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/not-kubescape": { - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - { - Type: watch.Added, - Object: &v1beta1.SBOMSPDXv2p3{ - ObjectMeta: v1.ObjectMeta{ - Name: "some-sbom", - ResourceVersion: "1", - }, - }, - }, - }, - }, - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot) - ctx := context.Background() - opts := storage.ListOptions{} - - // Arrange watches - watchSlicesByKey := map[string][]watch.Interface{} - for key, watchCount := range tc.inputWatchesByKey { - for i := 0; i < watchCount; i++ { - watch, _ := s.Watch(ctx, key, opts) - currentWatchSlice := watchSlicesByKey[key] - currentWatchSlice = append(currentWatchSlice, watch) - watchSlicesByKey[key] = currentWatchSlice + stopWatchers = func(ws map[string]int) { + for key, i := range ws { + watchers[key][i].Stop() + } + done <- true } - } + ) - // Arrange stopping of some watches - for key, watchIdx := range tc.inputStopWatchesAtIndex { - watchSlice := watchSlicesByKey[key] - watchSlice[watchIdx].Stop() - } - - // Act out the creation operation - var ttl uint64 = 0 - out := &v1beta1.SBOMSPDXv2p3{} - for key, object := range tc.inputObjects { - _ = s.Create(ctx, key, object, out, ttl) + go stopWatchers(tc.stopBefore) + wait() + { // Act out the creation operation + var ttl uint64 = 0 + out := &v1beta1.SBOMSPDXv2p3{} + for key, object := range tc.inputObjects { + _ = s.Create(ctx, key, object, out, ttl) + } + time.Sleep(chanWaitTimeout) // Create notifications happen asynchronously } + go stopWatchers(tc.stopAfter) + wait() // Assert the expected events - for key, expectedEvents := range tc.expectedEvents { - watches := watchSlicesByKey[key] - + for key, wantEvents := range tc.want { gotEvents := []watch.Event{} - for idx := range watches { + for _, w := range watchers[key] { select { - case gotEvent, ok := <-watches[idx].ResultChan(): + case ev, ok := <-w.ResultChan(): // Skip values from closed channels if !ok { continue } - gotEvents = append(gotEvents, gotEvent) + gotEvents = append(gotEvents, ev) case <-time.After(chanWaitTimeout): // Timed out, no event received continue } } - assert.Equal(t, expectedEvents, gotEvents) + assert.Equal(t, wantEvents, gotEvents) } }) } @@ -409,13 +269,11 @@ func TestWatchGuaranteedUpdateProducesMatchingEvents(t *testing.T) { s := NewStorageImpl(afero.NewMemMapFs(), DefaultStorageRoot) opts := storage.ListOptions{} - watchSlicesByKey := map[string][]watch.Interface{} + watchers := map[string][]watch.Interface{} for key, watchCount := range tc.inputWatchesByKey { for i := 0; i < watchCount; i++ { watch, _ := s.Watch(context.TODO(), key, opts) - currentWatchSlice := watchSlicesByKey[key] - currentWatchSlice = append(currentWatchSlice, watch) - watchSlicesByKey[key] = currentWatchSlice + watchers[key] = append(watchers[key], watch) } } @@ -423,13 +281,11 @@ func TestWatchGuaranteedUpdateProducesMatchingEvents(t *testing.T) { _ = s.GuaranteedUpdate(context.TODO(), tc.args.key, destination, tc.args.ignoreNotFound, tc.args.preconditions, tc.args.tryUpdate, tc.args.cachedExistingObject) for key, expectedEvents := range tc.expectedEvents { - watches := watchSlicesByKey[key] - gotEvents := []watch.Event{} - for idx := range watches { + for _, w := range watchers[key] { select { - case gotEvent := <-watches[idx].ResultChan(): - gotEvents = append(gotEvents, gotEvent) + case ev := <-w.ResultChan(): + gotEvents = append(gotEvents, ev) case <-time.After(chanWaitTimeout): // Timed out, no event received continue From 019361919c3a02afc06f85a699d728975f8f1536 Mon Sep 17 00:00:00 2001 From: ttimonen Date: Wed, 24 Jul 2024 15:12:28 +0000 Subject: [PATCH 2/3] fix(storage) Remove the deadlock with watch.Interface implementation. Signed-off-by: ttimonen --- pkg/registry/file/storage.go | 7 +- pkg/registry/file/watch.go | 119 ++++++++++++++++++++++++-------- pkg/registry/file/watch_test.go | 24 +++++++ 3 files changed, 120 insertions(+), 30 deletions(-) diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 4e4518e27..023c70686 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -260,9 +260,10 @@ func (s *StorageImpl) Watch(ctx context.Context, key string, _ storage.ListOptio _, span := otel.Tracer("").Start(ctx, "StorageImpl.Watch") span.SetAttributes(attribute.String("key", key)) defer span.End() - newWatcher := newWatcher(make(chan watch.Event)) - s.watchDispatcher.Register(key, newWatcher) - return newWatcher, nil + // TODO(ttimonen) Should we do ctx.WithoutCancel; or does the parent ctx lifetime match with expectations? + nw := newWatcher(ctx) + s.watchDispatcher.Register(key, nw) + return nw, nil } // Get unmarshals object found at key into objPtr. On a not found error, will either diff --git a/pkg/registry/file/watch.go b/pkg/registry/file/watch.go index fc345e20b..e894eae6a 100644 --- a/pkg/registry/file/watch.go +++ b/pkg/registry/file/watch.go @@ -1,10 +1,10 @@ package file import ( + "context" "errors" "path" "slices" - "sync" "github.com/puzpuzpuz/xsync/v2" "k8s.io/apimachinery/pkg/runtime" @@ -15,41 +15,106 @@ var ( errInvalidKey = errors.New("Provided key is invalid") ) -// watcher receives and forwards events to its listeners +/* +watcher receives and forwards events to its listeners. + +In particular, it implements the watch.Interface. But what does that mean? +The formal sense is easy: Implement the Step and ResultChan method in the right format. +The semantics however are something much more convoluted. +For example: + - Can you call Stop multiple times? What should happen then? + (The implementation used to crash here) + - What should happen Stop to happened events whose Results are not retrieved yet? + (The implementation used to sometimes drop them, sometimes deadlock both the Stopper + and the watcher, leaking also goroutines and memory.) + - What should the behavior be if client does not immediately read from the ResultChan() ? + (The implementation used to sometimes queue them into the stack of new goroutines, yet + sometimes block other notications against the same event/key until processed) + +The API doc (apimachinery/pkg/watch.Interface) says: +1 We shouldn't leak resources after Stop (goroutines, memory). +2 ResultChan receives all events "happened before" Stop() (what that means exactly, TBD). +3 ResultChan should be closed eventually after Stop() call. + +The actual usage of the API implies: + + 4 Stop() can be called multiple times. + 5 Stop() can also be called when the queue is not empty. + 6 Queue might also not be emptied by client. + 7 The queue of the watcher is not necessarily being read all the time + (for some values of "all"). + +Following the Hyrum's Law, this shall be the implicit interface to write the watcher against. + +How to implement this? +Problem with #3 is that typically closing the channel is used by the sender to tell that the receiver +can stop; here the role is inverted, making the control flow supported naturally by the primitives +working against us. Best long term choice would be to change the API Doc, but let's try to accommodate +instead. + +Constraint #7 is particularly challenging as well. We have only bad options. +Basically the underlying issue is that the server-side has to implement the queue management strategy, +yet client has full control of what kind of and how they are going to use the queue. Options: +a) "No queue" on server-side (there's always a queue). I.e. your queue is pushed outside the server +by causing backpressure by halting your server processing. The upside is that this is easiest to implement +and follows the spec. Unfortunately the server-side performance is going to be particularly miserable. +b) No queue, but fall back to dropping messages. This breakes the constraint #2 though. +c) Fixed queue, fall back to a or b when queue gets full. +d) Infinite queue. Unfortunately, only pure turing machines have those. Trying to construct one + + leads to solution (c_a) anyways, but with less predictable collapses and pushback. + +Ok. So the challenge with all the variants API-wise is that we have no way of communicating that we are +backlogged or that we are dropping messages. +Let's choose one. The c_a seems like the path of least suprise (c_b is possible as well). +*/ type watcher struct { - stopped bool - eventCh chan watch.Event - m sync.RWMutex + ctx context.Context + stop context.CancelFunc + outCh, inCh chan watch.Event } -// newWatcher creates a new watcher with a given channel -func newWatcher(wc chan watch.Event) *watcher { - return &watcher{ - stopped: false, - eventCh: wc, +// newWatcher creates a new watcher +func newWatcher(ctx context.Context) *watcher { + ctx, cn := context.WithCancel(ctx) + w := &watcher{ + ctx: ctx, + stop: cn, + outCh: make(chan watch.Event, 100), + inCh: make(chan watch.Event), } + go w.shipIt() + return w } -func (w *watcher) Stop() { - w.m.Lock() - defer w.m.Unlock() - w.stopped = true - close(w.eventCh) -} - -func (w *watcher) ResultChan() <-chan watch.Event { - return w.eventCh +func (w *watcher) Stop() { w.stop() } +func (w *watcher) ResultChan() <-chan watch.Event { return w.outCh } + +// shipIt is the only method writing to outCh. It is called only once per watcher +// and returns when context is gone. +// See discussion on constraint #3 above for rationale for this approach. +func (w *watcher) shipIt() { + defer close(w.outCh) + for { + var ev watch.Event + select { // we want both reads and writes to be interruptable, hence complexity here. + case <-w.ctx.Done(): + return + case ev = <-w.inCh: + } + select { + case <-w.ctx.Done(): + return + case w.outCh <- ev: + } + } } -func (w *watcher) notify(e watch.Event) bool { - w.m.RLock() - defer w.m.RUnlock() - if w.stopped { - return false +func (w *watcher) notify(e watch.Event) { + select { + case w.inCh <- e: + case <-w.ctx.Done(): } - - w.eventCh <- e - return true } type watchersList []*watcher diff --git a/pkg/registry/file/watch_test.go b/pkg/registry/file/watch_test.go index 0ddaacc7b..bec3b0170 100644 --- a/pkg/registry/file/watch_test.go +++ b/pkg/registry/file/watch_test.go @@ -150,6 +150,30 @@ func TestFilesystemStorageWatchPublishing(t *testing.T) { {Type: watch.Added, Object: obj}, {Type: watch.Added, Object: obj}, }}, + }, { + name: "Stopping watch after send shouldn't deadlock", + start: map[string]int{keyN: 3}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, + }, + stopAfter: map[string]int{keyN: 0}, + want: map[string][]watch.Event{keyN: { + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + }}, + }, { + name: "Stopping watch twice is ok", + start: map[string]int{keyN: 3}, + inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{ + keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}}, + }, + stopBefore: map[string]int{keyN: 1}, + stopAfter: map[string]int{keyN: 1}, + want: map[string][]watch.Event{keyN: { + {Type: watch.Added, Object: obj}, + {Type: watch.Added, Object: obj}, + }}, }} for _, tc := range tt { From e6665bc9fc6a84899fa620812286f9f61eee1252 Mon Sep 17 00:00:00 2001 From: ttimonen Date: Wed, 24 Jul 2024 16:41:16 +0000 Subject: [PATCH 3/3] fix(storage) Remove the memory leak from watchDispatcher. While watchDispatcher doesn't grow unboundedly anymore, it can't be torn down trivially. This is fixable, but a fix for that would leak more complexity into the storage layer. Signed-off-by: ttimonen --- pkg/registry/file/storage.go | 2 +- pkg/registry/file/watch.go | 54 ++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 023c70686..0a93a3888 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -50,7 +50,7 @@ type StorageImpl struct { processor Processor root string versioner storage.Versioner - watchDispatcher watchDispatcher + watchDispatcher *watchDispatcher } // StorageQuerier wraps the storage.Interface and adds some extra methods which are used by the storage implementation. diff --git a/pkg/registry/file/watch.go b/pkg/registry/file/watch.go index e894eae6a..7c657e465 100644 --- a/pkg/registry/file/watch.go +++ b/pkg/registry/file/watch.go @@ -121,17 +121,16 @@ type watchersList []*watcher // watchDispatcher dispatches events to registered watches // -// TODO(vladklokun): Please keep in mind that this dispatcher does not offer any collection of -// resources left over by the stopped watches! There are multiple ways to go about it: -// 1. On-Stop cleanup, where a watcher would notifies the dispatcher about being stopped, and that it can be cleaned up -// 2. Garbage collection. Periodic background cleanup. Poses challenges as this -// might be a contended concurrent resource. +// TODO(ttimonen): There's currently no way to gracefully take down watchDispatcher without leaking a goroutine. type watchDispatcher struct { watchesByKey *xsync.MapOf[string, watchersList] + gcCh chan string } -func newWatchDispatcher() watchDispatcher { - return watchDispatcher{xsync.NewMapOf[watchersList]()} +func newWatchDispatcher() *watchDispatcher { + wd := watchDispatcher{xsync.NewMapOf[watchersList](), make(chan string)} + go wd.gcer() + return &wd } func extractKeysToNotify(key string) []string { @@ -152,6 +151,30 @@ func (wd *watchDispatcher) Register(key string, w *watcher) { wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) { return append(l, w), false }) + go func() { + <-w.ctx.Done() + wd.gcCh <- key + }() +} + +func (wd *watchDispatcher) gcer() { + for key := range wd.gcCh { // This is an O(n) op, where n is # of watchers in a particular key. + wd.watchesByKey.Compute(key, func(l watchersList, _ bool) (watchersList, bool) { + if len(l) == 0 { + return nil, true + } + out := make(watchersList, 0, len(l)-1) // Preallocate with the intent to drop one element + // Doing dropping inplace would be more efficient alloc-wise, but would cause data races on notify + // the way it's currently implemented. + for _, w := range l { + if w.ctx.Err() == nil { + out = append(out, w) + } + } + return out, len(out) == 0 + }) + // TODO(ttimonen) sleeping a bit here can give a batch cleanup improvements, maybe. + } } // Added dispatches an "Added" event to appropriate watchers @@ -171,15 +194,12 @@ func (wd *watchDispatcher) Modified(key string, obj runtime.Object) { // notify notifies the listeners of a given key about an event of a given eventType about a given obj func (wd *watchDispatcher) notify(key string, eventType watch.EventType, obj runtime.Object) { - // Don’t block callers by publishing in a separate goroutine - // TODO(ttimonen) This is kind of expensive way to manage queue, yet watchers might block each other. - go func() { - event := watch.Event{Type: eventType, Object: obj} - for _, part := range extractKeysToNotify(key) { - ws, _ := wd.watchesByKey.Load(part) - for _, w := range ws { - w.notify(event) - } + // Notify calls do not block normally, unless the client-side is messed up. + event := watch.Event{Type: eventType, Object: obj} + for _, part := range extractKeysToNotify(key) { + ws, _ := wd.watchesByKey.Load(part) + for _, w := range ws { + w.notify(event) } - }() + } }