diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 9152ab2d96..73151da181 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -11,6 +11,7 @@ import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/config" @@ -37,6 +38,13 @@ type Callbacks interface { var deltaErrorResponse = &cache.RawDeltaResponse{} +// WithOrderedADS enables the internal flag to order responses strictly. +func WithOrderedADS() config.XDSOption { + return func(o *config.Opts) { + o.Ordered = true + } +} + type server struct { cache cache.ConfigWatcher callbacks Callbacks @@ -65,7 +73,10 @@ func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callba return s } -func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { +func (s *server) processDelta(str stream.DeltaStream, reqCh chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error { + // create a sharedChan for the watches to send responses to + sharedChan := make(chan cache.DeltaResponse, types.UnknownType) + streamID := atomic.AddInt64(&s.streamCount, 1) // streamNonce holds a unique nonce for req-resp pairs per xDS stream. @@ -78,6 +89,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De defer func() { watches.Cancel() + close(sharedChan) if s.callbacks != nil { s.callbacks.OnDeltaStreamClosed(streamID, node) } @@ -156,11 +168,15 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De req.Node = node } + ordered := false // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { if req.TypeUrl == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } + if s.opts.Ordered { + ordered = true + } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -184,7 +200,13 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.subscribe(req.GetResourceNamesSubscribe(), &watch.state) s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state) - watch.responses = make(chan cache.DeltaResponse, 1) + if ordered { + // Use the shared channel for ordered responses + watch.responses = sharedChan + watch.isSharedChan = true + } else { + watch.responses = make(chan cache.DeltaResponse, 1) + } watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses) watches.deltaWatches[typeURL] = watch diff --git a/pkg/server/delta/v3/watches.go b/pkg/server/delta/v3/watches.go index c88548388a..08ae586984 100644 --- a/pkg/server/delta/v3/watches.go +++ b/pkg/server/delta/v3/watches.go @@ -32,9 +32,10 @@ func (w *watches) Cancel() { // watch contains the necessary modifiables for receiving resource responses type watch struct { - responses chan cache.DeltaResponse - cancel func() - nonce string + responses chan cache.DeltaResponse + isSharedChan bool // is this watch using a shared channel + cancel func() + nonce string state stream.StreamState } @@ -44,7 +45,7 @@ func (w *watch) Cancel() { if w.cancel != nil { w.cancel() } - if w.responses != nil { + if w.responses != nil && !w.isSharedChan { // w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here // This is needed to release resources taken by goroutines watching this channel close(w.responses) diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index f8429f2997..876f275f87 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -15,6 +15,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -345,7 +346,9 @@ func TestDeltaAggregatedHandlers(t *testing.T) { resp.recv <- r } - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + // We create the server with the optional ordered ADS flag so we guarantee resource + // ordering over the stream. + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, delta.WithOrderedADS()) go func() { err := s.DeltaAggregatedResources(resp) assert.NoError(t, err)