Skip to content

Commit

Permalink
Add WithOrededADS option to delta xds server
Browse files Browse the repository at this point in the history
Signed-off-by: huabing zhao <[email protected]>
  • Loading branch information
zhaohuabing committed Aug 15, 2023
1 parent ef293f8 commit 487dc31
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
27 changes: 25 additions & 2 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/config"
Expand All @@ -37,6 +38,13 @@ type Callbacks interface {

var deltaErrorResponse = &cache.RawDeltaResponse{}

// WithOrderedADS enables the internal flag to order responses strictly.
func WithOrderedADS() config.XDSOption {
return func(o *config.Opts) {
o.Ordered = true
}
}

type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
Expand Down Expand Up @@ -65,7 +73,10 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba
return s
}

func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
// create a sharedChan for the watches to send responses to
sharedChan := make(chan cache.DeltaResponse, types.UnknownType)

streamID := atomic.AddInt64(&s.streamCount, 1)

// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
Expand All @@ -78,6 +89,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De

defer func() {
watches.Cancel()
close(sharedChan)
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID, node)
}
Expand Down Expand Up @@ -156,11 +168,15 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
req.Node = node
}

ordered := false
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
if s.opts.Ordered {
ordered = true
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
Expand All @@ -184,8 +200,15 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)

watch.responses = make(chan cache.DeltaResponse, 1)
if ordered {
// Use the shared channel for ordered responses
watch.responses = sharedChan
watch.isSharedChan = true
} else {
watch.responses = make(chan cache.DeltaResponse, 1)
}
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)

watches.deltaWatches[typeURL] = watch

go func() {
Expand Down
9 changes: 5 additions & 4 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func (w *watches) Cancel() {

// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
responses chan cache.DeltaResponse
isSharedChan bool // is this watch using a shared channel
cancel func()
nonce string

state stream.StreamState
}
Expand All @@ -44,7 +45,7 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil {
if w.responses != nil && !w.isSharedChan {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/envoyproxy/go-control-plane/pkg/test/resource/v3"
Expand Down Expand Up @@ -345,7 +346,9 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
resp.recv <- r
}

s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
// We create the server with the optional ordered ADS flag so we guarantee resource
// ordering over the stream.
s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS())
go func() {
err := s.DeltaAggregatedResources(resp)
assert.NoError(t, err)
Expand Down

0 comments on commit 487dc31

Please sign in to comment.