Skip to content

Commit

Permalink
multiplex responses over a single channel rather than selecting over …
Browse files Browse the repository at this point in the history
…dynamic channels in ordered ads

Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
alecholmez committed Mar 28, 2022
1 parent 9415699 commit 265f0c3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 51 deletions.
80 changes: 34 additions & 46 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
@@ -1,67 +1,66 @@
package sotw

import (
"reflect"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// process handles a bi-di stream request
func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// We make a responder channel here so we can multiplex responses from the dynamic channels.
sw.watches.addWatch(resource.AnyType, &watch{
cancel: nil,
nonce: "",
// Create a buffered channel the size of the known resource types.
response: make(chan cache.Response, types.UnknownType),
})

process := func(resp cache.Response) error {
nonce, err := sw.send(resp)
if err != nil {
return err
}

typeURL := resp.GetRequest().TypeUrl
sw.watches.responders[typeURL].nonce = nonce
sw.watches.responders[resp.GetRequest().TypeUrl].nonce = nonce
return nil
}

processAllExcept := func(typeURL string) error {
for {
index, value, ok := reflect.Select(sw.watches.cases)
// index is checked because if we receive a value
// from the Done() or request channel here
// we can ignore. The main control loop will handle that accordingly.
// This is strictly for handling incoming resources off the dynamic channels list.
if !ok || index < 2 {
// We just exit and assume the ordered parent resource isn't ready if !ok
return nil
}

res := value.Interface().(cache.Response)
if res.GetRequest().TypeUrl != typeURL {
if err := process(res); err != nil {
return err
select {
// We watch the multiplexed ADS channel for incoming responses.
case res := <-sw.watches.responders[resource.AnyType].response:
if res.GetRequest().TypeUrl != typeURL {
if err := process(res); err != nil {
return err
}
}
default:
return nil
}
}
}

// This control loop strictly orders resources when running in ADS mode.
// It should be treated as a child process of the original process() loop
// and should return on close of stream or error.
// This will cause the cleanup routinesin the parent process() loop to execute.
// This will cause the cleanup routines in the parent process() loop to execute.
for {
index, value, ok := reflect.Select(sw.watches.cases)
switch index {
case 0:
// ctx.Done() -> no further computation is needed
select {
case <-s.ctx.Done():
return nil
case 1:
case req, ok := <-reqCh:
// Input stream ended or failed.
if !ok {
return nil
}

req := value.Interface().(*discovery.DiscoveryRequest)
// Received an empty request over the request channel. Can't respond.
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
Expand All @@ -73,7 +72,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
req.Node = sw.node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
// Nonces can be reused across streams; we verify nonce only if nonce is not initialized.
nonce := req.GetResponseNonce()

// type URL is required for ADS but is implicit for xDS
Expand All @@ -99,7 +98,8 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
// Use the multiplexed channel for new watches.
responder := sw.watches.responders[resource.AnyType].response
if w, ok := sw.watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
Expand All @@ -112,37 +112,25 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe
}

sw.watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder),
cancel: s.cache.CreateWatch(req, sw.streamState, responder),
response: responder,
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
sw.watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder),
cancel: s.cache.CreateWatch(req, sw.streamState, responder),
response: responder,
})
}

// Recompute the dynamic select cases for this stream.
sw.watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders
// that correspond to the stream request typeURL
// No special processing here for ordering. We just send on the channels.
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}

res := value.Interface().(cache.Response)
nonce, err := sw.send(res)
// We only watch the multiplexed channel since all values will come through from process.
case res := <-sw.watches.responders[resource.AnyType].response:
err := process(res)
if err != nil {
return err
return status.Errorf(codes.Unavailable, err.Error())
}

sw.watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
}
5 changes: 3 additions & 2 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type streamWrapper struct {
watches watches // collection of stack allocated watchers per request type
callbacks Callbacks // callbacks for performing actions through stream lifecycle

// The below config fields are for tracking resource cache state
// The below fields are used for tracking resource
// cache state and should be maintained per stream.
streamState stream.StreamState
lastDiscoveryResponses map[string]lastDiscoveryResponse
}
Expand Down Expand Up @@ -123,7 +124,7 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) {
return out.Nonce, s.stream.Send(out)
}

// Discovery response that is sent over GRPC stream
// Discovery response that is sent over GRPC stream.
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (w *watches) recompute(ctx context.Context, req <-chan *discovery.Discovery
}
}

// watch contains the necessary modifiables for receiving resource responses
// watch contains the necessary modifiable data for receiving resource responses
type watch struct {
cancel func()
nonce string
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/sotw/v3/xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque
// Case 1 handles any request inbound on the stream
// and handles all initialization as needed
case 1:
// input stream ended or errored out
// input stream ended or failed
if !ok {
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/test/v3/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Callbacks struct {
Debug bool
Fetches int
Requests int
Responses int
DeltaRequests int
DeltaResponses int
mu sync.Mutex
Expand All @@ -21,7 +22,7 @@ type Callbacks struct {
func (cb *Callbacks) Report() {
cb.mu.Lock()
defer cb.mu.Unlock()
log.Printf("server callbacks fetches=%d requests=%d\n", cb.Fetches, cb.Requests)
log.Printf("server callbacks fetches=%d requests=%d responses=%d\n", cb.Fetches, cb.Requests, cb.Responses)
}

func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
Expand Down Expand Up @@ -70,6 +71,9 @@ func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *discov
if cb.Debug {
log.Printf("responding on stream %d with %s:%s", id, res.GetVersionInfo(), res.GetTypeUrl())
}
cb.mu.Lock()
defer cb.mu.Unlock()
cb.Responses++
}

func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *discovery.DeltaDiscoveryRequest, res *discovery.DeltaDiscoveryResponse) {
Expand Down
2 changes: 2 additions & 0 deletions sample/bootstrap-ads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ node:
static_resources:
clusters:
- connect_timeout: 30s
dns_lookup_family: V4_PREFERRED
load_assignment:
cluster_name: xds_cluster
endpoints:
Expand All @@ -37,6 +38,7 @@ static_resources:
http2_protocol_options: {}
name: xds_cluster
- connect_timeout: 30s
dns_lookup_family: V4_PREFERRED
load_assignment:
cluster_name: als_cluster
endpoints:
Expand Down

0 comments on commit 265f0c3

Please sign in to comment.