Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Clayton Gonsalves <[email protected]>
  • Loading branch information
clayton-gonsalves committed Sep 18, 2023
1 parent 6bc5b28 commit 3d60d99
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 15 deletions.
30 changes: 15 additions & 15 deletions internal/xdscache/v3/endpointstranslator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ type LoadBalancingEndpoint = envoy_endpoint_v3.LbEndpoint

// RecalculateEndpoints generates a slice of LoadBalancingEndpoint
// resources by matching the given service port to the given v1.Endpoints.
// ep may be nil, in which case, the result is also nil.
func RecalculateEndpoints(port, healthPort v1.ServicePort, ep *v1.Endpoints) []*LoadBalancingEndpoint {
if ep == nil {
// eps may be nil, in which case, the result is also nil.
func RecalculateEndpoints(port, healthPort v1.ServicePort, eps *v1.Endpoints) []*LoadBalancingEndpoint {
if eps == nil {
return nil
}

var lb []*LoadBalancingEndpoint
var healthCheckPort int32

for _, s := range ep.Subsets {
for _, s := range eps.Subsets {
// Skip subsets without ready addresses.
if len(s.Addresses) < 1 {
continue
Expand Down Expand Up @@ -199,16 +199,16 @@ func (c *EndpointsCache) SetClusters(clusters []*dag.ServiceCluster) error {
return nil
}

// UpdateEndpoint adds ep to the cache, or replaces it if it is
// UpdateEndpoint adds eps to the cache, or replaces it if it is
// already cached. Any ServiceClusters that are backed by a Service
// that ep belongs become stale. Returns a boolean indicating whether
// any ServiceClusters use ep or not.
func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool {
// that eps belongs become stale. Returns a boolean indicating whether
// any ServiceClusters use eps or not.
func (c *EndpointsCache) UpdateEndpoint(eps *v1.Endpoints) bool {
c.mu.Lock()
defer c.mu.Unlock()

name := k8s.NamespacedNameOf(ep)
c.endpoints[name] = ep.DeepCopy()
name := k8s.NamespacedNameOf(eps)
c.endpoints[name] = eps.DeepCopy()

// If any service clusters include this endpoint, mark them
// all as stale.
Expand All @@ -220,14 +220,14 @@ func (c *EndpointsCache) UpdateEndpoint(ep *v1.Endpoints) bool {
return false
}

// DeleteEndpoint deletes ep from the cache. Any ServiceClusters
// that are backed by a Service that ep belongs become stale. Returns
// a boolean indicating whether any ServiceClusters use ep or not.
func (c *EndpointsCache) DeleteEndpoint(ep *v1.Endpoints) bool {
// DeleteEndpoint deletes eps from the cache. Any ServiceClusters
// that are backed by a Service that eps belongs become stale. Returns
// a boolean indicating whether any ServiceClusters use eps or not.
func (c *EndpointsCache) DeleteEndpoint(eps *v1.Endpoints) bool {
c.mu.Lock()
defer c.mu.Unlock()

name := k8s.NamespacedNameOf(ep)
name := k8s.NamespacedNameOf(eps)
delete(c.endpoints, name)

// If any service clusters include this endpoint, mark them
Expand Down
38 changes: 38 additions & 0 deletions internal/xdscache/v3/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/projectcontour/contour/internal/contour"
"github.com/projectcontour/contour/internal/dag"
"github.com/projectcontour/contour/internal/fixture"
"github.com/projectcontour/contour/internal/ref"
"github.com/projectcontour/contour/internal/xds"
contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3"
"github.com/projectcontour/contour/internal/xdscache"
Expand All @@ -39,6 +40,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking_v1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -48,6 +50,7 @@ func TestGRPC(t *testing.T) {
// tr and et is recreated before the start of each test.
var et *EndpointsTranslator
var eh *contour.EventHandler
var est *EndpointSliceTranslator

tests := map[string]func(*testing.T, *grpc.ClientConn){
"StreamClusters": func(t *testing.T, cc *grpc.ClientConn) {
Expand Down Expand Up @@ -104,6 +107,39 @@ func TestGRPC(t *testing.T) {
checkrecv(t, stream) // check we receive one notification
checktimeout(t, stream) // check that the second receive times out
},
"StreamEndpointSlices": func(t *testing.T, cc *grpc.ClientConn) {
et.OnAdd(&discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-scheduler",
Namespace: "kube-system",
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{
"130.211.139.167",
},
},
},
Ports: []discoveryv1.EndpointPort{
{
Port: ref.To[int32](80),
},
{
Port: ref.To[int32](80),
},
},
}, false)

eds := envoy_service_endpoint_v3.NewEndpointDiscoveryServiceClient(cc)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
stream, err := eds.StreamEndpoints(ctx)
require.NoError(t, err)
sendreq(t, stream, resource.EndpointType) // send initial notification
checkrecv(t, stream) // check we receive one notification
checktimeout(t, stream) // check that the second receive times out
},
"StreamListeners": func(t *testing.T, cc *grpc.ClientConn) {
// add an ingress, which will create a non tls listener
eh.OnAdd(&networking_v1.Ingress{
Expand Down Expand Up @@ -200,12 +236,14 @@ func TestGRPC(t *testing.T) {
for name, fn := range tests {
t.Run(name, func(t *testing.T) {
et = NewEndpointsTranslator(fixture.NewTestLogger(t))
est = NewEndpointSliceTranslator(fixture.NewTestLogger(t))

resources := []xdscache.ResourceCache{
&ListenerCache{},
&SecretCache{},
&RouteCache{},
&ClusterCache{},
est,
et,
&RuntimeCache{},
}
Expand Down

0 comments on commit 3d60d99

Please sign in to comment.