Skip to content

Commit

Permalink
fix(storage) Remove the deadlock with watch.Interface implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ttimonen authored Jul 24, 2024
1 parent baaa53f commit da4bc64
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 30 deletions.
7 changes: 4 additions & 3 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,10 @@ func (s *StorageImpl) Watch(ctx context.Context, key string, _ storage.ListOptio
_, span := otel.Tracer("").Start(ctx, "StorageImpl.Watch")
span.SetAttributes(attribute.String("key", key))
defer span.End()
newWatcher := newWatcher(make(chan watch.Event))
s.watchDispatcher.Register(key, newWatcher)
return newWatcher, nil
// TODO(ttimonen) Should we do ctx.WithoutCancel; or does the parent ctx lifetime match with expectations?
nw := newWatcher(ctx)
s.watchDispatcher.Register(key, nw)
return nw, nil
}

// Get unmarshals object found at key into objPtr. On a not found error, will either
Expand Down
119 changes: 92 additions & 27 deletions pkg/registry/file/watch.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package file

import (
"context"
"errors"
"path"
"slices"
"sync"

"github.com/puzpuzpuz/xsync/v2"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -15,41 +15,106 @@ var (
errInvalidKey = errors.New("Provided key is invalid")
)

// watcher receives and forwards events to its listeners
/*
watcher receives and forwards events to its listeners.
In particular, it implements the watch.Interface. But what does that mean?
The formal sense is easy: Implement the Step and ResultChan method in the right format.
The semantics however are something much more convoluted.
For example:
- Can you call Stop multiple times? What should happen then?
(The implementation used to crash here)
- What should happen Stop to happened events whose Results are not retrieved yet?
(The implementation used to sometimes drop them, sometimes deadlock both the Stopper
and the watcher, leaking also goroutines and memory.)
- What should the behavior be if client does not immediately read from the ResultChan() ?
(The implementation used to sometimes queue them into the stack of new goroutines, yet
sometimes block other notications against the same event/key until processed)
The API doc (apimachinery/pkg/watch.Interface) says:
1 We shouldn't leak resources after Stop (goroutines, memory).
2 ResultChan receives all events "happened before" Stop() (what that means exactly, TBD).
3 ResultChan should be closed eventually after Stop() call.
The actual usage of the API implies:
4 Stop() can be called multiple times.
5 Stop() can also be called when the queue is not empty.
6 Queue might also not be emptied by client.
7 The queue of the watcher is not necessarily being read all the time
(for some values of "all").
Following the Hyrum's Law, this shall be the implicit interface to write the watcher against.
How to implement this?
Problem with #3 is that typically closing the channel is used by the sender to tell that the receiver
can stop; here the role is inverted, making the control flow supported naturally by the primitives
working against us. Best long term choice would be to change the API Doc, but let's try to accommodate
instead.
Constraint #7 is particularly challenging as well. We have only bad options.
Basically the underlying issue is that the server-side has to implement the queue management strategy,
yet client has full control of what kind of and how they are going to use the queue. Options:
a) "No queue" on server-side (there's always a queue). I.e. your queue is pushed outside the server
by causing backpressure by halting your server processing. The upside is that this is easiest to implement
and follows the spec. Unfortunately the server-side performance is going to be particularly miserable.
b) No queue, but fall back to dropping messages. This breakes the constraint #2 though.
c) Fixed queue, fall back to a or b when queue gets full.
d) Infinite queue. Unfortunately, only pure turing machines have those. Trying to construct one
leads to solution (c_a) anyways, but with less predictable collapses and pushback.
Ok. So the challenge with all the variants API-wise is that we have no way of communicating that we are
backlogged or that we are dropping messages.
Let's choose one. The c_a seems like the path of least suprise (c_b is possible as well).
*/
type watcher struct {
stopped bool
eventCh chan watch.Event
m sync.RWMutex
ctx context.Context
stop context.CancelFunc
outCh, inCh chan watch.Event
}

// newWatcher creates a new watcher with a given channel
func newWatcher(wc chan watch.Event) *watcher {
return &watcher{
stopped: false,
eventCh: wc,
// newWatcher creates a new watcher
func newWatcher(ctx context.Context) *watcher {
ctx, cn := context.WithCancel(ctx)
w := &watcher{
ctx: ctx,
stop: cn,
outCh: make(chan watch.Event, 100),
inCh: make(chan watch.Event),
}
go w.shipIt()
return w
}

func (w *watcher) Stop() {
w.m.Lock()
defer w.m.Unlock()
w.stopped = true
close(w.eventCh)
}

func (w *watcher) ResultChan() <-chan watch.Event {
return w.eventCh
func (w *watcher) Stop() { w.stop() }
func (w *watcher) ResultChan() <-chan watch.Event { return w.outCh }

// shipIt is the only method writing to outCh. It is called only once per watcher
// and returns when context is gone.
// See discussion on constraint #3 above for rationale for this approach.
func (w *watcher) shipIt() {
defer close(w.outCh)
for {
var ev watch.Event
select { // we want both reads and writes to be interruptable, hence complexity here.
case <-w.ctx.Done():
return
case ev = <-w.inCh:
}
select {
case <-w.ctx.Done():
return
case w.outCh <- ev:
}
}
}

func (w *watcher) notify(e watch.Event) bool {
w.m.RLock()
defer w.m.RUnlock()
if w.stopped {
return false
func (w *watcher) notify(e watch.Event) {
select {
case w.inCh <- e:
case <-w.ctx.Done():
}

w.eventCh <- e
return true
}

type watchersList []*watcher
Expand Down
24 changes: 24 additions & 0 deletions pkg/registry/file/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,30 @@ func TestFilesystemStorageWatchPublishing(t *testing.T) {
{Type: watch.Added, Object: obj},
{Type: watch.Added, Object: obj},
}},
}, {
name: "Stopping watch after send shouldn't deadlock",
start: map[string]int{keyN: 3},
inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{
keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}},
},
stopAfter: map[string]int{keyN: 0},
want: map[string][]watch.Event{keyN: {
{Type: watch.Added, Object: obj},
{Type: watch.Added, Object: obj},
{Type: watch.Added, Object: obj},
}},
}, {
name: "Stopping watch twice is ok",
start: map[string]int{keyN: 3},
inputObjects: map[string]*v1beta1.SBOMSPDXv2p3{
keyN + "/some-sbom": {ObjectMeta: v1.ObjectMeta{Name: "some-sbom"}},
},
stopBefore: map[string]int{keyN: 1},
stopAfter: map[string]int{keyN: 1},
want: map[string][]watch.Event{keyN: {
{Type: watch.Added, Object: obj},
{Type: watch.Added, Object: obj},
}},
}}

for _, tc := range tt {
Expand Down

0 comments on commit da4bc64

Please sign in to comment.