From 3d60d9963dcbee2ee0df92fe881b61613b06bb7d Mon Sep 17 00:00:00 2001 From: Clayton Gonsalves Date: Mon, 18 Sep 2023 15:16:16 +0200 Subject: [PATCH] add tests Signed-off-by: Clayton Gonsalves --- internal/xdscache/v3/endpointstranslator.go | 30 ++++++++-------- internal/xdscache/v3/server_test.go | 38 +++++++++++++++++++++ 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/internal/xdscache/v3/endpointstranslator.go b/internal/xdscache/v3/endpointstranslator.go index 675f426f6b0..cef63cb07e3 100644 --- a/internal/xdscache/v3/endpointstranslator.go +++ b/internal/xdscache/v3/endpointstranslator.go @@ -38,16 +38,16 @@ type LoadBalancingEndpoint = envoy_endpoint_v3.LbEndpoint // RecalculateEndpoints generates a slice of LoadBalancingEndpoint // resources by matching the given service port to the given v1.Endpoints. -// ep may be nil, in which case, the result is also nil. -func RecalculateEndpoints(port, healthPort v1.ServicePort, ep *v1.Endpoints) []*LoadBalancingEndpoint { - if ep == nil { +// eps may be nil, in which case, the result is also nil. +func RecalculateEndpoints(port, healthPort v1.ServicePort, eps *v1.Endpoints) []*LoadBalancingEndpoint { + if eps == nil { return nil } var lb []*LoadBalancingEndpoint var healthCheckPort int32 - for _, s := range ep.Subsets { + for _, s := range eps.Subsets { // Skip subsets without ready addresses. if len(s.Addresses) < 1 { continue @@ -199,16 +199,16 @@ func (c *EndpointsCache) SetClusters(clusters []*dag.ServiceCluster) error { return nil } -// UpdateEndpoint adds ep to the cache, or replaces it if it is +// UpdateEndpoint adds eps to the cache, or replaces it if it is // already cached. Any ServiceClusters that are backed by a Service -// that ep belongs become stale. Returns a boolean indicating whether -// any ServiceClusters use ep or not. -func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool { +// that eps belongs become stale. Returns a boolean indicating whether +// any ServiceClusters use eps or not. +func (c *EndpointsCache) UpdateEndpoint(eps *v1.Endpoints) bool { c.mu.Lock() defer c.mu.Unlock() - name := k8s.NamespacedNameOf(ep) - c.endpoints[name] = ep.DeepCopy() + name := k8s.NamespacedNameOf(eps) + c.endpoints[name] = eps.DeepCopy() // If any service clusters include this endpoint, mark them // all as stale. @@ -220,14 +220,14 @@ func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool { return false } -// DeleteEndpoint deletes ep from the cache. Any ServiceClusters -// that are backed by a Service that ep belongs become stale. Returns -// a boolean indicating whether any ServiceClusters use ep or not. -func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) bool { +// DeleteEndpoint deletes eps from the cache. Any ServiceClusters +// that are backed by a Service that eps belongs become stale. Returns +// a boolean indicating whether any ServiceClusters use eps or not. +func (c *EndpointsCache) DeleteEndpoint(eps *v1.Endpoints) bool { c.mu.Lock() defer c.mu.Unlock() - name := k8s.NamespacedNameOf(ep) + name := k8s.NamespacedNameOf(eps) delete(c.endpoints, name) // If any service clusters include this endpoint, mark them diff --git a/internal/xdscache/v3/server_test.go b/internal/xdscache/v3/server_test.go index 54d845e06f0..8088dc578a6 100644 --- a/internal/xdscache/v3/server_test.go +++ b/internal/xdscache/v3/server_test.go @@ -30,6 +30,7 @@ import ( "github.com/projectcontour/contour/internal/contour" "github.com/projectcontour/contour/internal/dag" "github.com/projectcontour/contour/internal/fixture" + "github.com/projectcontour/contour/internal/ref" "github.com/projectcontour/contour/internal/xds" contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3" "github.com/projectcontour/contour/internal/xdscache" @@ -39,6 +40,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" networking_v1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -48,6 +50,7 @@ func TestGRPC(t *testing.T) { // tr and et is recreated before the start of each test. var et *EndpointsTranslator var eh *contour.EventHandler + var est *EndpointSliceTranslator tests := map[string]func(*testing.T, *grpc.ClientConn){ "StreamClusters": func(t *testing.T, cc *grpc.ClientConn) { @@ -104,6 +107,39 @@ func TestGRPC(t *testing.T) { checkrecv(t, stream) // check we receive one notification checktimeout(t, stream) // check that the second receive times out }, + "StreamEndpointSlices": func(t *testing.T, cc *grpc.ClientConn) { + et.OnAdd(&discoveryv1.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kube-scheduler", + Namespace: "kube-system", + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ + { + Addresses: []string{ + "130.211.139.167", + }, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Port: ref.To[int32](80), + }, + { + Port: ref.To[int32](80), + }, + }, + }, false) + + eds := envoy_service_endpoint_v3.NewEndpointDiscoveryServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + stream, err := eds.StreamEndpoints(ctx) + require.NoError(t, err) + sendreq(t, stream, resource.EndpointType) // send initial notification + checkrecv(t, stream) // check we receive one notification + checktimeout(t, stream) // check that the second receive times out + }, "StreamListeners": func(t *testing.T, cc *grpc.ClientConn) { // add an ingress, which will create a non tls listener eh.OnAdd(&networking_v1.Ingress{ @@ -200,12 +236,14 @@ func TestGRPC(t *testing.T) { for name, fn := range tests { t.Run(name, func(t *testing.T) { et = NewEndpointsTranslator(fixture.NewTestLogger(t)) + est = NewEndpointSliceTranslator(fixture.NewTestLogger(t)) resources := []xdscache.ResourceCache{ &ListenerCache{}, &SecretCache{}, &RouteCache{}, &ClusterCache{}, + est, et, &RuntimeCache{}, }