Skip to content

Commit

Permalink
initial ADS code path split so we can order responses for target modes
Browse files Browse the repository at this point in the history
Signed-off-by: Alec Holmes <[email protected]>
  • Loading branch information
alecholmez committed Mar 3, 2022
1 parent 1ffa321 commit 28c9137
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 181 deletions.
162 changes: 162 additions & 0 deletions pkg/server/sotw/v3/ads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package sotw

import (
"fmt"
"reflect"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

// process handles a bi-di stream request
func (s *server) processADS(sw *streamWrapper, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// process := func(resp cache.Response) error {
// nonce, err := send(resp)
// if err != nil {
// return err
// }
// typeUrl := resp.GetRequest().TypeUrl
// watches.nonces[typeUrl] = nonce
// return nil
// }

// processAllExcept := func(typeUrl string) error {
// for {
// select {
// case resp := <-values.responses:
// if resp.GetRequest().TypeUrl != typeUrl {
// if err := process(resp); err != nil {
// return err
// }
// }
// default:
// return nil
// }
// }
// }

// node may only be set on the first discovery request
var node = &core.Node{}

// This control loop strictly orders resources when running in ADS mode.
// It should be treated as a child process of the original process() loop
// and should return on close of stream or error. This will cause the cleanup routines
// in the parent process() loop to execute.
for {
fmt.Printf("%+v\n", sw.watches.cases)
index, value, ok := reflect.Select(sw.watches.cases)
fmt.Println(index)
fmt.Println(value)
fmt.Println(ok)

switch index {
case 0:
fmt.Println("recieved a Done() on the ADS code path.")
// ctx.Done() -> no further computation is needed
return nil
case 1:
fmt.Println("This is the ordered ADS code path request pipeline")
if !ok {
return nil
}

req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}

// Only first request is guaranteed to hold node info so if it's missing, reassign.
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(sw.ID, req); err != nil {
return err
}
}

// if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
// if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// // Let's record Resource names that a client has received.
// streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
// }
// }

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := sw.watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()

sw.watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder),
response: responder,
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
sw.watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, stream.StreamState{}, responder),
response: responder,
})
}

// Recompute the dynamic select cases for this stream.
sw.watches.recompute(s.ctx, reqCh)
default:
fmt.Println("This is the ordered ADS code path default pipeline")

// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}

res := value.Interface().(cache.Response)
nonce, err := sw.send(res)
if err != nil {
return err
}

sw.watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
}

// typeUrl := req.TypeUrl
// responseNonce, seen := values.nonces[typeUrl]
// if !seen || responseNonce == nonce {
// if cancel, seen := values.cancellations[typeUrl]; seen {
// if cancel != nil {
// cancel()
// }
// if err := processAllExcept(typeUrl); err != nil {
// return err
// }
// }
// values.cancellations[typeUrl] = s.cache.CreateWatch(req, values.responses)
// }
199 changes: 35 additions & 164 deletions pkg/server/sotw/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@ package sotw
import (
"context"
"errors"
"reflect"
"strconv"
"sync/atomic"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)

Expand Down Expand Up @@ -63,173 +56,51 @@ type server struct {
streamCount int64
}

// Discovery response that is sent over GRPC stream
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
type lastDiscoveryResponse struct {
nonce string
resources map[string]struct{}
type streamWrapper struct {
stream stream.Stream // parent stream object
ID int64 // stream ID in relation to total stream count
nonce int64 // nonce per stream
watches watches
callbacks Callbacks
streamState stream.StreamState
}

// process handles a bi-di stream request
func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
streamID := atomic.AddInt64(&s.streamCount, 1)

// unique nonce generator for req-resp pairs per xDS stream; the server
// ignores stale nonces. nonce is only modified within send() function.
var streamNonce int64

streamState := stream.NewStreamState(false, map[string]string{})
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}

// a collection of stack allocated watches per request type
watches := newWatches()

defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
}()

// sends a response by serializing to protobuf Any
send := func(resp cache.Response) (string, error) {
if resp == nil {
return "", errors.New("missing response")
}

out, err := resp.GetDiscoveryResponse()
if err != nil {
return "", err
}
func (s *streamWrapper) send(resp cache.Response) (string, error) {
if resp == nil {
return "", errors.New("missing response")
}

// increment nonce
streamNonce = streamNonce + 1
out.Nonce = strconv.FormatInt(streamNonce, 10)
out, err := resp.GetDiscoveryResponse()
if err != nil {
return "", err
}

lastResponse := lastDiscoveryResponse{
nonce: out.Nonce,
resources: make(map[string]struct{}),
}
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
// increment nonce
s.nonce = s.nonce + 1
out.Nonce = strconv.FormatInt(s.nonce, 10)

if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
}
return out.Nonce, str.Send(out)
lastResponse := lastDiscoveryResponse{
nonce: out.Nonce,
resources: make(map[string]struct{}),
}
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
// lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse

if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
}
s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out)
}
return out.Nonce, s.stream.Send(out)
}

// node may only be set on the first discovery request
var node = &core.Node{}

// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)

for {
// The list of select cases looks like this:
// 0: <- ctx.Done
// 1: <- reqCh
// 2...: per type watches
index, value, ok := reflect.Select(watches.cases)
switch index {
// ctx.Done() -> if we receive a value here we return as no further computation is needed
case 0:
return nil
// Case 1 handles any request inbound on the stream and handles all initialization as needed
case 1:
// input stream ended or errored out
if !ok {
return nil
}

req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}

// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}

// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()

// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}

if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
}
}

if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
}
}

typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()

watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}

// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}

res := value.Interface().(cache.Response)
nonce, err := send(res)
if err != nil {
return err
}

watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
// Discovery response that is sent over GRPC stream
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
type lastDiscoveryResponse struct {
nonce string
resources map[string]struct{}
}

// StreamHandler converts a blocking read call to channels and initiates stream processing
Expand Down
Loading

0 comments on commit 28c9137

Please sign in to comment.