diff --git a/pkg/server/config/config.go b/pkg/server/config/config.go new file mode 100644 index 0000000000..ddc2392119 --- /dev/null +++ b/pkg/server/config/config.go @@ -0,0 +1,25 @@ +package config + +// Opts for individual xDS implementations that can be +// utilized through the functional opts pattern +type Opts struct { + // If true respond to ADS requests with a guaranteed resource ordering + Ordered bool +} + +func NewOpts() Opts { + return Opts{ + Ordered: false, + } +} + +// Each xDS implementation should implement their own functional opts. +// It is recommended that config values be added in this package specifically, +// but the individual opts functions should be in their respective +// implementation package so the import looks like the following: +// +// `sotw.WithOrderedADS()` +// `delta.WithOrderedADS()` +// +// this allows for easy inference as to which opt applies to what implementation. +type XDSOption func(*Opts) diff --git a/pkg/server/config/doc.go b/pkg/server/config/doc.go new file mode 100644 index 0000000000..2c85adfd5f --- /dev/null +++ b/pkg/server/config/doc.go @@ -0,0 +1,22 @@ +/* +Config abstracts xDS server options into a unified configuration package +that allows for easy manipulation as well as unified passage of options +to individual xDS server implementations. + +This enables code reduction as well as a unified source of config. Delta +and SOTW might have similar ordered responses through ADS and rather than +duplicating the logic across server implementations, we add the options +in this package which are passed down to each individual spec. + +Each xDS implementation should implement their own functional opts. +It is recommended that config values be added in this package specifically, +but the individual opts functions should be in their respective +implementation package so the import looks like the following: + +`sotw.WithOrderedADS()` +`delta.WithOrderedADS()` + +this allows for easy inference as to which opt applies to what implementation. +*/ + +package config diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 8edc5361e9..cece5022f5 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -13,6 +13,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -46,7 +47,7 @@ type server struct { } // NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { +func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { return &server{ cache: config, callbacks: callbacks, diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 09d275d19f..ae91188b57 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -1,13 +1,11 @@ package sotw import ( - "fmt" "reflect" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" @@ -15,53 +13,50 @@ import ( ) // process handles a bi-di stream request -func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { - // process := func(resp cache.Response) error { - // nonce, err := send(resp) - // if err != nil { - // return err - // } - // typeUrl := resp.GetRequest().TypeUrl - // watches.nonces[typeUrl] = nonce - // return nil - // } - - // processAllExcept := func(typeUrl string) error { - // for { - // select { - // case resp := <-values.responses: - // if resp.GetRequest().TypeUrl != typeUrl { - // if err := process(resp); err != nil { - // return err - // } - // } - // default: - // return nil - // } - // } - // } - - // node may only be set on the first discovery request - var node = &core.Node{} +func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { + process := func(resp cache.Response) error { + nonce, err := sw.send(resp) + if err != nil { + return err + } + + typeURL := resp.GetRequest().TypeUrl + sw.watches.responders[typeURL].nonce = nonce + return nil + } + + processAllExcept := func(typeURL string) error { + for { + index, value, ok := reflect.Select(sw.watches.cases) + // index is checked because if we receive a value + // from the Done() or request channel here + // we can ignore. The main control loop will handle that accordingly. + // This is strictly for handling incoming resources off the dynamic channels list. + if !ok || index < 2 { + // We just exit and assume the ordered parent resource isn't ready if !ok + return nil + } + + res := value.Interface().(cache.Response) + if res.GetRequest().TypeUrl != typeURL { + if err := process(res); err != nil { + return err + } + } + } + } // This control loop strictly orders resources when running in ADS mode. // It should be treated as a child process of the original process() loop - // and should return on close of stream or error. This will cause the cleanup routines - // in the parent process() loop to execute. + // and should return on close of stream or error. + // This will cause the cleanup routinesin the parent process() loop to execute. for { - fmt.Printf("%+v\n", sw.watches.cases) index, value, ok := reflect.Select(sw.watches.cases) - fmt.Println(index) - fmt.Println(value) - fmt.Println(ok) - switch index { case 0: - fmt.Println("recieved a Done() on the ADS code path.") // ctx.Done() -> no further computation is needed return nil case 1: - fmt.Println("This is the ordered ADS code path request pipeline") if !ok { return nil } @@ -73,9 +68,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery // Only first request is guaranteed to hold node info so if it's missing, reassign. if req.Node != nil { - node = req.Node + sw.node = req.Node } else { - req.Node = node + req.Node = sw.node } // nonces can be reused across streams; we verify nonce only if nonce is not initialized @@ -96,12 +91,12 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery } } - // if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { - // if lastResponse.nonce == "" || lastResponse.nonce == nonce { - // // Let's record Resource names that a client has received. - // streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) - // } - // } + if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse.nonce == "" || lastResponse.nonce == nonce { + // Let's record Resource names that a client has received. + sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + } + } typeURL := req.GetTypeUrl() responder := make(chan cache.Response, 1) @@ -111,6 +106,11 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery if w.nonce == "" || w.nonce == nonce { w.close() + // Only process if we have an existing watch otherwise go ahead and create. + if err := processAllExcept(typeURL); err != nil { + return err + } + sw.watches.addWatch(typeURL, &watch{ cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder), response: responder, @@ -128,9 +128,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) default: - fmt.Println("This is the ordered ADS code path default pipeline") - - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL + // Channel n -> these are the dynamic list of responders + // that correspond to the stream request typeURL + // No special processing here for ordering. We just send on the channels. if !ok { // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) @@ -146,17 +146,3 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery } } } - -// typeUrl := req.TypeUrl -// responseNonce, seen := values.nonces[typeUrl] -// if !seen || responseNonce == nonce { -// if cancel, seen := values.cancellations[typeUrl]; seen { -// if cancel != nil { -// cancel() -// } -// if err := processAllExcept(typeUrl); err != nil { -// return err -// } -// } -// values.cancellations[typeUrl] = s.cache.CreateWatch(req, values.responses) -// } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 54844736f2..5ebc4d6918 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -20,8 +20,10 @@ import ( "errors" "strconv" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" ) @@ -43,8 +45,23 @@ type Callbacks interface { } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server { - return &server{cache: config, callbacks: callbacks, ctx: ctx} +func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server { + s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()} + + // Parse through our options + for _, opt := range opts { + opt(&s.opts) + } + + return s +} + +// WithOrderedADS enables the internal flag to order responces +// strictly. +func WithOrderedADS() config.XDSOption { + return func(o *config.Opts) { + o.Ordered = true + } } type server struct { @@ -54,17 +71,29 @@ type server struct { // streamCount for counting bi-di streams streamCount int64 + + // Local configuration flags for individual xDS implementations. + opts config.Opts } +// streamWrapper abstracts specific data points inside a stream so we can access them +// throughout various code paths in an xDS stream lifecycle. This comes in handy when dealing +// with varying implementation types such as ordered vs unordered. type streamWrapper struct { - stream stream.Stream // parent stream object - ID int64 // stream ID in relation to total stream count - nonce int64 // nonce per stream - watches watches - callbacks Callbacks - streamState stream.StreamState + stream stream.Stream // parent stream object + ID int64 // stream ID in relation to total stream count + nonce int64 // nonce per stream + node *core.Node + watches watches // collection of stack allocated watchers per request type + callbacks Callbacks // callbacks for performing actions through stream lifecycle + + // The below config fields are for tracking resource cache state + streamState stream.StreamState + lastDiscoveryResponses map[string]lastDiscoveryResponse } +// Send packages the necessary resources before sending on the gRPC stream, +// and sets the current state of the world. func (s *streamWrapper) send(resp cache.Response) (string, error) { if resp == nil { return "", errors.New("missing response") diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 3ec28c02b7..871fde1961 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -4,28 +4,34 @@ import ( "reflect" "sync/atomic" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) // process handles a bi-di stream request -func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error { +func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error { + // create our streamWrapper which can be passed down to sub control loops. + // this is useful for abstracting critical information for various types of + // xDS resource processing. sw := streamWrapper{ - stream: str, - ID: atomic.AddInt64(&s.streamCount, 1), // increment stream count - callbacks: s.callbacks, - streamState: stream.NewStreamState(false, map[string]string{}), - watches: newWatches(), + stream: str, + ID: atomic.AddInt64(&s.streamCount, 1), // increment stream count + callbacks: s.callbacks, + node: &core.Node{}, // node may only be set on the first discovery request + + // a collection of stack allocated watches per request type. + watches: newWatches(), + streamState: stream.NewStreamState(false, map[string]string{}), + lastDiscoveryResponses: make(map[string]lastDiscoveryResponse), } - lastDiscoveryResponses := map[string]lastDiscoveryResponse{} - // a collection of stack allocated watches per request type - + // cleanup once our stream has ended. defer func() { sw.watches.close() if s.callbacks != nil { @@ -39,10 +45,9 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } - // node may only be set on the first discovery request - var node = &core.Node{} - - // recompute dynamic channels for this stream + // do an initial recompute so we can load the first 2 channels: + // request + // s.ctx.Done() sw.watches.recompute(s.ctx, reqCh) for { @@ -52,10 +57,12 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // 2...: per type watches index, value, ok := reflect.Select(sw.watches.cases) switch index { - // ctx.Done() -> if we receive a value here we return as no further computation is needed + // ctx.Done() -> if we receive a value here we return + // as no further computation is needed case 0: return nil - // Case 1 handles any request inbound on the stream and handles all initialization as needed + // Case 1 handles any request inbound on the stream + // and handles all initialization as needed case 1: // input stream ended or errored out if !ok { @@ -69,9 +76,9 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // Only first request is guaranteed to hold node info so if it's missing, reassign. if req.Node != nil { - node = req.Node + sw.node = req.Node } else { - req.Node = node + req.Node = sw.node } // nonces can be reused across streams; we verify nonce only if nonce is not initialized @@ -83,15 +90,24 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - // When using ADS we need to order responses. This is guaranteed in the xDS protocol specification - // as ADS is required to be eventually consistent. More details can be found here if interested: + // When using ADS we need to order responses. + // This is guaranteed in the xDS protocol specification + // as ADS is required to be eventually consistent. + // More details can be found here if interested: // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations - sw.streamState.IsOrdered(true) - - // Trigger a different code path specifically for ADS. We want resource ordering - // so things don't get sent before they should. This is a blocking call and - // will exit the process function on successful completion. - return s.processADS(&sw, reqCh, defaultTypeURL) + if s.opts.Ordered { + // send our first request on the stream again so it doesn't get + // lost in processing on the new control loop + go func() { + reqCh <- req + }() + + // Trigger a different code path specifically for ADS. + // We want resource orderingso things don't get sent before they should. + // This is a blocking call and will exit the process function + // on successful completion. + return s.processADS(&sw, reqCh, defaultTypeURL) + } } else if req.TypeUrl == "" { req.TypeUrl = defaultTypeURL } @@ -102,7 +118,7 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } - if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) @@ -124,7 +140,6 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq } } else { // No pre-existing watch exists, let's create one. - // We need to precompute the watches first then open a watch in the cache. sw.watches.addWatch(typeURL, &watch{ cancel: s.cache.CreateWatch(req, sw.streamState, responder), response: responder, @@ -134,7 +149,8 @@ func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryReq // Recompute the dynamic select cases for this stream. sw.watches.recompute(s.ctx, reqCh) default: - // Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL + // Channel n -> these are the dynamic list of + // responders that correspond to the stream request typeURL if !ok { // Receiver channel was closed. TODO(jpeach): probably cancel the watch or something? return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index) diff --git a/pkg/server/stream/v3/stream.go b/pkg/server/stream/v3/stream.go index 5739eeb8cf..77be8bbbdd 100644 --- a/pkg/server/stream/v3/stream.go +++ b/pkg/server/stream/v3/stream.go @@ -35,9 +35,6 @@ type StreamState struct { // nolint:golint,revive // indicates whether the object has beed modified since its creation first bool - - // indicates whether we want an ordered ADS stream or not - ordered bool } // NewStreamState initializes a stream state with empty defaults. @@ -47,7 +44,6 @@ func NewStreamState(wildcard bool, initialResourceVersions map[string]string) St resourceVersions: initialResourceVersions, first: true, knownResourceNames: map[string]map[string]struct{}{}, - ordered: false, // Ordered comes from the first request since that's when we discover if they want ADS } if initialResourceVersions == nil { @@ -79,13 +75,6 @@ func (s *StreamState) IsWildcard() bool { return s.wildcard } -// IsOrdered returns wherther or not the current stream should run as an ordered ADS stream. -// This means less backpressure relief but a guarantee of correct discovery response order. -func (s *StreamState) IsOrdered(ordered bool) bool { - s.ordered = ordered - return ordered -} - // SetKnownResourceNames sets a list of resource names in a stream utilizing the SOTW protocol. func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) { s.knownResourceNames[url] = names diff --git a/pkg/server/v3/server.go b/pkg/server/v3/server.go index 8a3bfd3abe..9106d5a98a 100644 --- a/pkg/server/v3/server.go +++ b/pkg/server/v3/server.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/envoyproxy/go-control-plane/pkg/server/config" "github.com/envoyproxy/go-control-plane/pkg/server/delta/v3" "github.com/envoyproxy/go-control-plane/pkg/server/rest/v3" "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" @@ -162,10 +163,10 @@ func (c CallbackFuncs) OnFetchResponse(req *discovery.DiscoveryRequest, resp *di } // NewServer creates handlers from a config watcher and callbacks. -func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Server { +func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks, opts ...config.XDSOption) Server { return NewServerAdvanced(rest.NewServer(config, callbacks), - sotw.NewServer(ctx, config, callbacks), - delta.NewServer(ctx, config, callbacks), + sotw.NewServer(ctx, config, callbacks, opts...), + delta.NewServer(ctx, config, callbacks, opts...), ) } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 9f5c161122..33d9f44788 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -32,6 +32,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" rsrc "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3" "github.com/envoyproxy/go-control-plane/pkg/server/stream/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/envoyproxy/go-control-plane/pkg/test/resource/v3" @@ -579,7 +580,9 @@ func TestAggregatedHandlers(t *testing.T) { ResourceNames: []string{virtualHostName}, } - s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) + // We create the server with the optional ordered ADS flag so we guarantee resource + // ordering over the stream. + s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithOrderedADS()) go func() { err := s.StreamAggregatedResources(resp) assert.NoError(t, err)