Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: huabing zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Aug 21, 2023
1 parent 71d67b5 commit 52998ad
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
12 changes: 7 additions & 5 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,13 @@ func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statu
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) > 0 {
err := snapshot.ConstructVersionMap()
if err != nil {
return err
}
if len(info.deltaWatches) == 0 {
return nil
}

err := snapshot.ConstructVersionMap()
if err != nil {
return err
}

// If ADS is enabled we need to order response delta watches so we guarantee
Expand Down
29 changes: 5 additions & 24 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/config"
Expand Down Expand Up @@ -74,9 +73,6 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba
}

func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
// create a sharedChan for the watches to send ordered responses to
sharedChan := make(chan cache.DeltaResponse, types.UnknownType)

streamID := atomic.AddInt64(&s.streamCount, 1)

// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
Expand All @@ -85,24 +81,10 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// a collection of stack allocated watches per request type
watches := newWatches()

// use a single go routine to send responses to the muxedResponses channel to retain resource orders
go func() {
for {
select {
case resp, more := <-sharedChan:
if !more {
return
}
watches.deltaMuxedResponses <- resp
}
}
}()

var node = &core.Node{}

defer func() {
watches.Cancel()
close(sharedChan)
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID, node)
}
Expand Down Expand Up @@ -214,18 +196,17 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

if ordered {
// Use the shared channel for ordered responses
watch.responses = sharedChan
watch.isSharedChan = true
// Use the shared channel to keep the order of responses.
watch.UseSharedResponseChan(watches.deltaMuxedResponses)
} else {
watch.responses = make(chan cache.DeltaResponse, 1)
watch.MakeResponseChan()
}
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch

// just handle normal non-ordered responses here
// all ordered responses are handled in a single go routine
if !watch.isSharedChan {
// all ordered responses are sent to the muxedResponses channel directly
if !watch.useSharedChan {
go func() {
resp, more := <-watch.responses
if more {
Expand Down
20 changes: 15 additions & 5 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,30 @@ func (w *watches) Cancel() {

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
isSharedChan bool // is this watch using a shared channel
cancel func()
nonce string
responses chan cache.DeltaResponse
useSharedChan bool // is this watch using a shared channel
cancel func()
nonce string

state stream.StreamState
}

func (w *watch) MakeResponseChan() {
w.responses = make(chan cache.DeltaResponse, 1)
w.useSharedChan = false
}

func (w *watch) UseSharedResponseChan(sharedChan chan cache.DeltaResponse) {
w.responses = sharedChan
w.useSharedChan = true
}

// Cancel calls terminate and cancel
func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil && !w.isSharedChan {
if w.responses != nil && !w.useSharedChan {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
Expand Down

0 comments on commit 52998ad

Please sign in to comment.