Skip to content

Commit

Permalink
implement functional opts pattern to support xds options
Browse files Browse the repository at this point in the history
Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
alecholmez committed Mar 7, 2022
1 parent 28c9137 commit c85611c
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 118 deletions.
25 changes: 25 additions & 0 deletions pkg/server/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package config

// Opts for individual xDS implementations that can be
// utilized through the functional opts pattern
type Opts struct {
// If true respond to ADS requests with a guaranteed resource ordering
Ordered bool
}

func NewOpts() Opts {
return Opts{
Ordered: false,
}
}

// Each xDS implementation should implement their own functional opts.
// It is recommended that config values be added in this package specifically,
// but the individual opts functions should be in their respective
// implementation package so the import looks like the following:
//
// `sotw.WithOrderedADS()`
// `delta.WithOrderedADS()`
//
// this allows for easy inference as to which opt applies to what implementation.
type XDSOption func(*Opts)
22 changes: 22 additions & 0 deletions pkg/server/config/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Config abstracts xDS server options into a unified configuration package
that allows for easy manipulation as well as unified passage of options
to individual xDS server implementations.
This enables code reduction as well as a unified source of config. Delta
and SOTW might have similar ordered responses through ADS and rather than
duplicating the logic across server implementations, we add the options
in this package which are passed down to each individual spec.
Each xDS implementation should implement their own functional opts.
It is recommended that config values be added in this package specifically,
but the individual opts functions should be in their respective
implementation package so the import looks like the following:
`sotw.WithOrderedADS()`
`delta.WithOrderedADS()`
this allows for easy inference as to which opt applies to what implementation.
*/

package config
3 changes: 2 additions & 1 deletion pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ type server struct {
}

// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
return &server{
cache: config,
callbacks: callbacks,
Expand Down
114 changes: 50 additions & 64 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,62 @@
package sotw

import (
"fmt"
"reflect"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// process handles a bi-di stream request
func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// process := func(resp cache.Response) error {
// nonce, err := send(resp)
// if err != nil {
// return err
// }
// typeUrl := resp.GetRequest().TypeUrl
// watches.nonces[typeUrl] = nonce
// return nil
// }

// processAllExcept := func(typeUrl string) error {
// for {
// select {
// case resp := <-values.responses:
// if resp.GetRequest().TypeUrl != typeUrl {
// if err := process(resp); err != nil {
// return err
// }
// }
// default:
// return nil
// }
// }
// }

// node may only be set on the first discovery request
var node = &core.Node{}
func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
process := func(resp cache.Response) error {
nonce, err := sw.send(resp)
if err != nil {
return err
}

typeURL := resp.GetRequest().TypeUrl
sw.watches.responders[typeURL].nonce = nonce
return nil
}

processAllExcept := func(typeURL string) error {
for {
index, value, ok := reflect.Select(sw.watches.cases)
// index is checked because if we receive a value
// from the Done() or request channel here
// we can ignore. The main control loop will handle that accordingly.
// This is strictly for handling incoming resources off the dynamic channels list.
if !ok || index < 2 {
// We just exit and assume the ordered parent resource isn't ready if !ok
return nil
}

res := value.Interface().(cache.Response)
if res.GetRequest().TypeUrl != typeURL {
if err := process(res); err != nil {
return err
}
}
}
}

// This control loop strictly orders resources when running in ADS mode.
// It should be treated as a child process of the original process() loop
// and should return on close of stream or error. This will cause the cleanup routines
// in the parent process() loop to execute.
// and should return on close of stream or error.
// This will cause the cleanup routinesin the parent process() loop to execute.
for {
fmt.Printf("%+v\n", sw.watches.cases)
index, value, ok := reflect.Select(sw.watches.cases)
fmt.Println(index)
fmt.Println(value)
fmt.Println(ok)

switch index {
case 0:
fmt.Println("recieved a Done() on the ADS code path.")
// ctx.Done() -> no further computation is needed
return nil
case 1:
fmt.Println("This is the ordered ADS code path request pipeline")
if !ok {
return nil
}
Expand All @@ -73,9 +68,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery

// Only first request is guaranteed to hold node info so if it's missing, reassign.
if req.Node != nil {
node = req.Node
sw.node = req.Node
} else {
req.Node = node
req.Node = sw.node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
Expand All @@ -96,12 +91,12 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery
}
}

// if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
// if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// // Let's record Resource names that a client has received.
// streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
// }
// }
if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
}
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
Expand All @@ -111,6 +106,11 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery
if w.nonce == "" || w.nonce == nonce {
w.close()

// Only process if we have an existing watch otherwise go ahead and create.
if err := processAllExcept(typeURL); err != nil {
return err
}

sw.watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder),
response: responder,
Expand All @@ -128,9 +128,9 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery
// Recompute the dynamic select cases for this stream.
sw.watches.recompute(s.ctx, reqCh)
default:
fmt.Println("This is the ordered ADS code path default pipeline")

// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
// Channel n -> these are the dynamic list of responders
// that correspond to the stream request typeURL
// No special processing here for ordering. We just send on the channels.
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
Expand All @@ -146,17 +146,3 @@ func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.Discovery
}
}
}

// typeUrl := req.TypeUrl
// responseNonce, seen := values.nonces[typeUrl]
// if !seen || responseNonce == nonce {
// if cancel, seen := values.cancellations[typeUrl]; seen {
// if cancel != nil {
// cancel()
// }
// if err := processAllExcept(typeUrl); err != nil {
// return err
// }
// }
// values.cancellations[typeUrl] = s.cache.CreateWatch(req, values.responses)
// }
45 changes: 37 additions & 8 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"errors"
"strconv"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/config"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand All @@ -43,8 +45,23 @@ type Callbacks interface {
}

// NewServer creates handlers from a config watcher and callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
return &server{cache: config, callbacks: callbacks, ctx: ctx}
func NewServer(ctx context.Context, cw cache.ConfigWatcher, callbacks Callbacks, opts ...config.XDSOption) Server {
s := &server{cache: cw, callbacks: callbacks, ctx: ctx, opts: config.NewOpts()}

// Parse through our options
for _, opt := range opts {
opt(&s.opts)
}

return s
}

// WithOrderedADS enables the internal flag to order responces
// strictly.
func WithOrderedADS() config.XDSOption {
return func(o *config.Opts) {
o.Ordered = true
}
}

type server struct {
Expand All @@ -54,17 +71,29 @@ type server struct {

// streamCount for counting bi-di streams
streamCount int64

// Local configuration flags for individual xDS implementations.
opts config.Opts
}

// streamWrapper abstracts specific data points inside a stream so we can access them
// throughout various code paths in an xDS stream lifecycle. This comes in handy when dealing
// with varying implementation types such as ordered vs unordered.
type streamWrapper struct {
stream stream.Stream // parent stream object
ID int64 // stream ID in relation to total stream count
nonce int64 // nonce per stream
watches watches
callbacks Callbacks
streamState stream.StreamState
stream stream.Stream // parent stream object
ID int64 // stream ID in relation to total stream count
nonce int64 // nonce per stream
node *core.Node
watches watches // collection of stack allocated watchers per request type
callbacks Callbacks // callbacks for performing actions through stream lifecycle

// The below config fields are for tracking resource cache state
streamState stream.StreamState
lastDiscoveryResponses map[string]lastDiscoveryResponse
}

// Send packages the necessary resources before sending on the gRPC stream,
// and sets the current state of the world.
func (s *streamWrapper) send(resp cache.Response) (string, error) {
if resp == nil {
return "", errors.New("missing response")
Expand Down
Loading

0 comments on commit c85611c

Please sign in to comment.