Skip to content

Commit

Permalink
purge response channel before processing a request to avoid deadlock
Browse files Browse the repository at this point in the history
Signed-off-by: huabing zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Sep 13, 2023
1 parent f7dfca1 commit 34cb745
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 31 deletions.
71 changes: 48 additions & 23 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba
return s
}

func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
streamID := atomic.AddInt64(&s.streamCount, 1)

// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
Expand All @@ -83,7 +83,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt
}
}()

// Sends a response, returns the new stream nonce
// sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
Expand All @@ -103,6 +103,44 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt
return response.Nonce, str.Send(response)
}

// process a single delta response
process := func(resp cache.DeltaResponse) error {
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}

nonce, err := send(resp)
if err != nil {
return err
}

watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
return nil
}

// processAll purges the deltaMuxedResponses channel
processAll := func() error {
for {
select {
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
if err := process(resp); err != nil {
return err
}
default:
return nil
}
}
}

if s.callbacks != nil {
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
Expand All @@ -113,41 +151,29 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt
select {
case <-s.ctx.Done():
return nil
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
// input stream ended or errored out
if !more {
break
}

typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}

nonce, err := send(resp)
if err != nil {
if err := process(resp); err != nil {
return err
}

watch := watches.deltaWatches[typ]
watch.nonce = nonce

watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}

if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}

// make sure responses are processed prior to new requests to avoid deadlock
if len(watches.deltaMuxedResponses) > 0 {
go func() {
reqCh <- req
}()
break
// make sure all existing responses are processed prior to new requests to avoid deadlock
if err := processAll(); err != nil {
return err
}

if s.callbacks != nil {
Expand Down Expand Up @@ -192,8 +218,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.Delt
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = watches.deltaMuxedResponses
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watches.deltaWatches[typeURL] = watch
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type watches struct {
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
Expand All @@ -28,13 +32,14 @@ func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}

close(w.deltaMuxedResponses)
}

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
cancel func()
nonce string

state stream.StreamState
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/server/delta/v3/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,18 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

func TestDeltaWatches(t *testing.T) {
t.Run("watches response channels are properly closed when the watches are canceled", func(t *testing.T) {
watches := newWatches()

cancelCount := 0
var channels []chan cache.DeltaResponse
// create a few watches, and ensure that the cancel function are called and the channels are closed
for i := 0; i < 5; i++ {
newWatch := watch{}
if i%2 == 0 {
newWatch.cancel = func() { cancelCount++ }
newWatch.responses = make(chan cache.DeltaResponse)
channels = append(channels, newWatch.responses)
}

watches.deltaWatches[strconv.Itoa(i)] = newWatch
Expand Down

0 comments on commit 34cb745

Please sign in to comment.