Skip to content

Commit

Permalink
[fallback] Set xDS configuration from the client (#116)
Browse files Browse the repository at this point in the history
Instead of hardcoding resource values in the control plane
implementation, have test runner set the resources for each test case.
  • Loading branch information
eugeneo authored Aug 8, 2024
1 parent 53d1d8e commit 8edbdb7
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 330 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/psm-interop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ jobs:
protos/grpc/testing/empty.proto
protos/grpc/testing/messages.proto
protos/grpc/testing/test.proto
protos/grpc/testing/xdsconfig/control.proto
protos/grpc/testing/xdsconfig/service.proto
protos/grpc/testing/xdsconfig/xdsconfig.proto
- name: "Run unit tests"
run: python -m tests.unit
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,5 @@ venv/
venv-*/
out/
protos/**/*_pb2*
# Generated Go files
docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig/*.pb.go
10 changes: 10 additions & 0 deletions docker/go-control-plane/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ the root of `grpc/psm-interop` checkout.
```
docker build . -f docker/go-control-plane/Dockerfile
```

## Local development

Run the following command from this repository to generate code from the .proto
files:
```
protoc -I=. --go_out=docker/go-control-plane \
protos/grpc/testing/xdsconfig/*.proto \
--go-grpc_out=docker/go-control-plane/
```
124 changes: 0 additions & 124 deletions docker/go-control-plane/controlplane/resource.go

This file was deleted.

120 changes: 40 additions & 80 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,16 @@ import (
"log"
"net"
"os"
"strconv"
"sync"

"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/grpc/psm-interop/docker/go-control-plane/controlplane"
xdsconfigpb "github.com/grpc/psm-interop/docker/go-control-plane/grpc/interop/grpc_testing/xdsconfig"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
Expand All @@ -52,14 +48,13 @@ import (
)

var (
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
upstream = flag.String("upstream", "localhost:3000", "upstream server")
port = flag.Uint("port", 3333, "Port to listen on")
nodeid = flag.String("nodeid", "test-id", "Node ID")
)

type Filter struct {
ResourceType string
ResourceName string
type resourceKey struct {
resourceType string
resourceName string
}

// controlService provides a gRPC API to configure test-specific control plane
Expand All @@ -68,8 +63,7 @@ type controlService struct {
xdsconfigpb.UnsafeXdsConfigControlServiceServer
version uint32
mu sync.Mutex // Guards access to all fields listed below
clusters map[string]*v3clusterpb.Cluster
listeners map[string]*v3listenerpb.Listener
resources map[resourceKey]*proto.Message
filters map[string]map[string]bool
cache cache.SnapshotCache
}
Expand All @@ -79,52 +73,55 @@ type controlService struct {
func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.StopOnRequestRequest) (*xdsconfigpb.StopOnRequestResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
if val, ok := srv.filters[req.GetResourceType()]; ok {
val[req.GetResourceName()] = true
if val, ok := srv.filters[req.TypeUrl]; ok {
val[req.Name] = true
} else {
srv.filters[req.GetResourceType()] = map[string]bool{req.GetResourceName(): true}
srv.filters[req.TypeUrl] = map[string]bool{req.Name: true}
}
res := xdsconfigpb.StopOnRequestResponse{}
for t, names := range srv.filters {
for name, _ := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
for name := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{TypeUrl: t, Name: name})
}
}
return &res, nil
}

// UpsertResources allows the test to provide a new or replace existing xDS
// resource. Notification will be sent to any control plane clients watching
// SetResources allows the test to provide a new or replace existing xDS
// resources. Notification will be sent to any control plane clients watching
// the resource being updated.
func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.UpsertResourcesRequest) (*xdsconfigpb.UpsertResourcesResponse, error) {
func (srv *controlService) SetResources(_ context.Context, req *xdsconfigpb.SetResourcesRequest) (*xdsconfigpb.SetResourcesResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.version++
listener := controlplane.ListenerName
if req.Listener != nil {
listener = *req.Listener
if len(req.Resources) > 0 {
srv.version++
}
for _, resource := range req.Resources {
key := resourceKey{resourceType: resource.TypeUrl, resourceName: resource.Name}
contents := resource.Body
if contents == nil {
delete(srv.resources, key)
continue
}
body, err := contents.UnmarshalNew()
if err != nil {
log.Printf("Failed to parse %s/%s: %v", key.resourceType, key.resourceName, err)
continue
}
srv.resources[key] = &body
}
srv.clusters[req.Cluster] = controlplane.MakeCluster(req.Cluster, req.UpstreamHost, req.UpstreamPort)
srv.listeners[listener] = controlplane.MakeHTTPListener(listener, req.Cluster)
if err := srv.RefreshSnapshot(); err != nil {
return nil, err
}
res := &xdsconfigpb.UpsertResourcesResponse{}
for _, l := range srv.listeners {
a, err := anypb.New(l)
res := xdsconfigpb.SetResourcesResponse{}
for key, message := range srv.resources {
a, err := anypb.New(*message)
if err != nil {
log.Fatalf("Failed to convert listener %v to pb: %v\n", l, err)
log.Printf("Can not wrap resource %s/%s into any: %v", key.resourceType, key.resourceName, err)
}
res.Resource = append(res.Resource, a)
}
for _, c := range srv.clusters {
a, err := anypb.New(c)
if err != nil {
log.Fatalf("Failed to convert cluster %v to pb: %v\n", c, err)
}
res.Resource = append(res.Resource, a)
}
return res, nil
return &res, nil
}

// Abruptly stops the server when the client requests a resource that the test
Expand All @@ -146,31 +143,15 @@ func (srv *controlService) onStreamRequest(id int64, req *v3discoverypb.Discover
}

func (srv *controlService) RefreshSnapshot() error {
var listeners []types.Resource
for _, l := range srv.listeners {
listeners = append(listeners, l)
}
var clusters []types.Resource
for _, c := range srv.clusters {
clusters = append(clusters, c)
resources := map[resource.Type][]types.Resource{}
for k, resource := range srv.resources {
resources[k.resourceType] = append(resources[k.resourceType], *resource)
}
resources := map[resource.Type][]types.Resource{resource.ListenerType: listeners, resource.ClusterType: clusters}
// Create the snapshot that we'll serve to Envoy
snapshot, err := cache.NewSnapshot(fmt.Sprint(srv.version), resources)
if err != nil {
return err
}
log.Printf("Snapshot contents:\n")
for _, values := range snapshot.Resources {
for name, item := range values.Items {
text, err := protojson.MarshalOptions{Multiline: true}.Marshal(item.Resource)
if err != nil {
log.Printf("Resource %v, error: %v\n", name, err)
continue
}
log.Printf("%v => %v\n", name, string(text))
}
}
if err := snapshot.Consistent(); err != nil {
log.Printf("Snapshot inconsistency: %v\n", err)
return err
Expand All @@ -183,7 +164,6 @@ func (srv *controlService) RefreshSnapshot() error {
return nil
}


func (srv *controlService) RunServer(port uint) error {
if err := srv.RefreshSnapshot(); err != nil {
log.Fatalf("Failed to refresh snapshot: %v\n", err)
Expand All @@ -208,32 +188,12 @@ func (srv *controlService) RunServer(port uint) error {
return nil
}

func parseHostPort(host_port string) (string, uint32, error) {
host, upstreamPort, err := net.SplitHostPort(*upstream)
if err != nil {
return "", 0, fmt.Errorf("Incorrect upstream host name: %s: %v\n", host_port, err)
}
parsedUpstreamPort, err := strconv.Atoi(upstreamPort)
if err != nil || parsedUpstreamPort <= 0 {
return "", 0, fmt.Errorf("Not a valid port number: %d: %v\n", upstreamPort, err)
}
return host, uint32(parsedUpstreamPort), nil
}


// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
flag.Parse()
host, upstreamPort, err := parseHostPort(*upstream)
if err != nil {
log.Fatalf("Incorrect upstream host name: %s: %v\n", upstream, err)
}
initial_cds := controlplane.MakeCluster(controlplane.ClusterName, host, upstreamPort)
initial_lds := controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)
controlService := &controlService{version: 1,
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: initial_cds},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: initial_lds},
resources: map[resourceKey]*proto.Message{},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
Expand Down
Loading

0 comments on commit 8edbdb7

Please sign in to comment.