diff --git a/dataclients/kubernetes/client_test.go b/dataclients/kubernetes/client_test.go index 4d274c9952..0dc2ac36bf 100644 --- a/dataclients/kubernetes/client_test.go +++ b/dataclients/kubernetes/client_test.go @@ -86,3 +86,29 @@ func TestClientGetEndpointAddresses(t *testing.T) { assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) }) } + +func TestClientLoadEndpointAddresses(t *testing.T) { + t.Run("from endpoints", func(t *testing.T) { + client := newTestClient(t, + kubernetes.Options{}, + "testdata/ingressV1/ingress-data/lb-target-multi.yaml", + ) + + addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + assert.NoError(t, err) + assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs) + }) + + t.Run("from endpointslices", func(t *testing.T) { + client := newTestClient(t, + kubernetes.Options{ + KubernetesEnableEndpointslices: true, + }, + "testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml", + ) + + addrs, err := client.LoadEndpointAddresses("namespace1", "service1") + assert.NoError(t, err) + assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs) + }) +} diff --git a/dataclients/kubernetes/clusterclient.go b/dataclients/kubernetes/clusterclient.go index f09684910d..87f76b278e 100644 --- a/dataclients/kubernetes/clusterclient.go +++ b/dataclients/kubernetes/clusterclient.go @@ -482,13 +482,13 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e // loadEndpointSlices is different from the other load$Kind() // functions because there are 1..N endpointslices created for a given -// service. endpointslices need to be deduplicated and state needs to +// service. Endpointslices need to be deduplicated and state needs to // be checked. We read all endpointslices and create de-duplicated -// business objects skipperEndpointSlice instead of raw Kubernetes +// business objects [skipperEndpointSlice] instead of raw Kubernetes // objects, because we need just a clean list of load balancer // members. The returned map will return the full list of ready // non-terminating endpoints that should be in the load balancer of a -// given service, check endpointSlice.ToResourceID(). +// given service, check [endpointSlice.ToResourceID]. func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skipperEndpointSlice, error) { var endpointSlices endpointSliceList if err := c.getJSON(c.endpointSlicesURI+c.endpointSlicesLabelSelectors, &endpointSlices); err != nil { @@ -497,6 +497,10 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe } log.Debugf("all endpointslices received: %d", len(endpointSlices.Items)) + return collectReadyEndpoints(&endpointSlices), nil +} + +func collectReadyEndpoints(endpointSlices *endpointSliceList) map[definitions.ResourceID]*skipperEndpointSlice { mapSlices := make(map[definitions.ResourceID][]*endpointSlice) for _, endpointSlice := range endpointSlices.Items { resID := endpointSlice.ToResourceID() // service resource ID @@ -550,6 +554,40 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe result[resID].Endpoints = append(result[resID].Endpoints, o) } } + return result +} + +// loadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API. +func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) { + var result []string + if c.enableEndpointSlices { + url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) + + toLabelSelectorQuery(map[string]string{endpointSliceServiceNameLabel: name}) + + var endpointSlices endpointSliceList + if err := c.getJSON(url, &endpointSlices); err != nil { + return nil, fmt.Errorf("requesting endpointslices for %s/%s failed: %w", namespace, name, err) + } + + ready := collectReadyEndpoints(&endpointSlices) + if len(ready) != 1 { + return nil, fmt.Errorf("unexpected number of endpoint slices for %s/%s: %d", namespace, name, len(ready)) + } + + for _, eps := range ready { + result = eps.addresses() + break + } + } else { + url := fmt.Sprintf(EndpointsNamespaceFmt, namespace) + "/" + name + + var ep endpoint + if err := c.getJSON(url, &ep); err != nil { + return nil, fmt.Errorf("requesting endpoints for %s/%s failed: %w", namespace, name, err) + } + result = ep.addresses() + } + sort.Strings(result) return result, nil } diff --git a/dataclients/kubernetes/endpointslices.go b/dataclients/kubernetes/endpointslices.go index 436c3e4a42..e91818c80b 100644 --- a/dataclients/kubernetes/endpointslices.go +++ b/dataclients/kubernetes/endpointslices.go @@ -4,6 +4,8 @@ import ( "github.com/zalando/skipper/dataclients/kubernetes/definitions" ) +const endpointSliceServiceNameLabel = "kubernetes.io/service-name" + // There are [1..N] Kubernetes endpointslices created for a single Kubernetes service. // Kubernetes endpointslices of a given service can have duplicates with different states. // Therefore Kubernetes endpointslices need to be de-duplicated before usage. @@ -97,7 +99,7 @@ type endpointSlice struct { // ToResourceID returns the same string for a group endpointlisces created for the same svc func (eps *endpointSlice) ToResourceID() definitions.ResourceID { - svcName := eps.Meta.Labels["kubernetes.io/service-name"] + svcName := eps.Meta.Labels[endpointSliceServiceNameLabel] namespace := eps.Meta.Namespace return newResourceID(namespace, svcName) } diff --git a/dataclients/kubernetes/kube.go b/dataclients/kubernetes/kube.go index 7810fe7486..523b71fdd8 100644 --- a/dataclients/kubernetes/kube.go +++ b/dataclients/kubernetes/kube.go @@ -574,7 +574,8 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters { return filters } -// GetEndpointAddresses returns the list of all addresses for the given service. +// GetEndpointAddresses returns the list of all addresses for the given service +// loaded by previous call to LoadAll or LoadUpdate. func (c *Client) GetEndpointAddresses(ns, name string) []string { c.mu.Lock() defer c.mu.Unlock() @@ -584,6 +585,11 @@ func (c *Client) GetEndpointAddresses(ns, name string) []string { return c.state.getEndpointAddresses(ns, name) } +// LoadEndpointAddresses returns the list of all addresses for the given service. +func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) { + return c.ClusterClient.loadEndpointAddresses(namespace, name) +} + func compareStringList(a, b []string) []string { c := make([]string, 0) for i := len(a) - 1; i >= 0; i-- { diff --git a/docs/tutorials/ratelimit.md b/docs/tutorials/ratelimit.md index c2a0e7fa02..2625b1fdac 100644 --- a/docs/tutorials/ratelimit.md +++ b/docs/tutorials/ratelimit.md @@ -94,36 +94,51 @@ and your dataclient in use. ### Redis based Cluster Ratelimits -This solution is independent of the dataclient being used. You have -to run N number of [Redis](https://redis.io) instances, where N is > -0. +This solution is independent of the dataclient being used. +You have to run one or more [Redis](https://redis.io) instances. +See also [Running with Redis based Cluster Ratelimits](../kubernetes/ingress-controller.md#redis-based). -There are 2 different configurations to assign redis instances as a skipper redis swarm. +There are 3 different configurations to assign Redis instances as a Skipper Redis swarm. -- Static: - - Specify `-swarm-redis-urls`, multiple instances can be separated - by `,`, for example: `-swarm-redis-urls=redis1:6379,redis2:6379`. Use this if you don't need to scale your redis instances. +#### Static -- Kubernetes Service Selector: - - Specify `-kubernetes-redis-service-namespace=default` and `-kubernetes-redis-service-name=redis`, this will create kubernetes dataclient which will take care of collecting redis instances endpoints. This will allow you to scale redis instances as much as you want. +Specify `-swarm-redis-urls`, multiple instances can be separated by comma, +for example: `-swarm-redis-urls=redis1:6379,redis2:6379`. +Use this if you don't need to scale your Redis instances. -- HTTP Endpoint - - Specify `-swarm-redis-remote=http://127.0.0.1/redis/endpoints`, this will make skipper pull endpoints from this remote URL and work with them. +#### Kubernetes Service Selector -see also [Running with -Redis based Cluster Ratelimits](../kubernetes/ingress-controller.md#redis-based) +Specify `-kubernetes-redis-service-namespace=`, `-kubernetes-redis-service-name=` +and optional `-kubernetes-redis-service-port=`. -#### Redis Swarm Configuration +Skipper will update Redis addresses every 10 seconds from specified service endpoints. +This allows you to dynamically scale Redis instances. +Note that when `-kubernetes` is set Skipper also fetches `Ingresses` and `RouteGroups` for routing, +see [ingress-controller deployment docs](../kubernetes/ingress-controller.md). -When working with Redis swarm, use Kubernetes service selector. Configure it with `-kubernetes-redis-service-namespace` and `-kubernetes-redis-service-name` flags. +#### HTTP Endpoint -Auto-discovery routine for new Redis endpoints will be triggered every 10 seconds. +Specify `-swarm-redis-remote=http://127.0.0.1/redis/endpoints`, -If you have [routesrv proxy](https://opensource.zalando.com/skipper/kubernetes/ingress-controller/#routesrv) enabled, you need to configure Skipper with the flag `-swarm-redis-remote=http://skipper.ingress.cluster.local/swarm/redis/shards` where value is the service address for `routesrv` service. `Routesrv` will be responsible for collecting Redis endpoints and Skipper will poll them from the proxy. +Skipper will update Redis addresses every 10 seconds from this remote URL +that should return data in the following JSON format: +```json +{ + "endpoints": [ + {"address": "10.2.0.1:6379"}, {"address": "10.2.0.2:6379"}, + {"address": "10.2.0.3:6379"}, {"address": "10.2.0.4:6379"}, + {"address": "10.2.0.5:6379"} + ] +} +``` + +If you have [routesrv proxy](https://opensource.zalando.com/skipper/kubernetes/ingress-controller/#routesrv) enabled, +you need to configure Skipper with the flag `-swarm-redis-remote=http://..svc.cluster.local/swarm/redis/shards`. +`Routesrv` will be responsible for collecting Redis endpoints and Skipper will poll them from it. #### Implementation -The implementation use [redis ring](https://godoc.org/github.com/go-redis/redis#Ring) +The implementation use [Redis ring](https://godoc.org/github.com/go-redis/redis#Ring) to be able to shard via client hashing and spread the load across multiple Redis instances to be able to scale out the shared storage. diff --git a/redis_test.go b/redis_test.go index b4bff88256..6be128dbc7 100644 --- a/redis_test.go +++ b/redis_test.go @@ -8,7 +8,6 @@ import ( "net" "net/http" stdlibhttptest "net/http/httptest" - "net/url" "os" "syscall" "testing" @@ -20,6 +19,7 @@ import ( flog "github.com/zalando/skipper/filters/accesslog" fscheduler "github.com/zalando/skipper/filters/scheduler" "github.com/zalando/skipper/loadbalancer" + "github.com/zalando/skipper/metrics" "github.com/zalando/skipper/metrics/metricstest" "github.com/zalando/skipper/net/httptest" "github.com/zalando/skipper/net/redistest" @@ -82,27 +82,15 @@ spec: // apiserver1 redisSpec1 := createRedisEndpointsSpec(t, redis1) - apiServer1, u1, err := createApiserver(kubeSpec + redisSpec1) - if err != nil { - t.Fatalf("Failed to start apiserver1: %v", err) - } - defer apiServer1.Close() + apiServer1 := createApiserver(t, kubeSpec+redisSpec1) // apiserver2 redisSpec2 := createRedisEndpointsSpec(t, redis1, redis2) - apiServer2, u2, err := createApiserver(kubeSpec + redisSpec2) - if err != nil { - t.Fatalf("Failed to start apiserver2: %v", err) - } - defer apiServer2.Close() + apiServer2 := createApiserver(t, kubeSpec+redisSpec2) // apiserver3 redisSpec3 := createRedisEndpointsSpec(t, redis1, redis2, redis3) - apiServer3, u3, err := createApiserver(kubeSpec + redisSpec3) - if err != nil { - t.Fatalf("Failed to start apiserver3: %v", err) - } - defer apiServer3.Close() + apiServer3 := createApiserver(t, kubeSpec+redisSpec3) // create skipper as LB to kube-apiservers fr := createFilterRegistry(fscheduler.NewFifo(), flog.NewEnableAccessLog()) @@ -113,15 +101,13 @@ spec: }) defer reg.Close() - docFmt := ` -r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ; -r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ; -` - docApiserver := fmt.Sprintf(docFmt, u1.String(), u2.String(), u3.String(), u1.String(), u2.String(), u3.String()) + docApiserver := fmt.Sprintf(`r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ;`, + apiServer1.URL, apiServer2.URL, apiServer3.URL) + dc, err := testdataclient.NewDoc(docApiserver) - if err != nil { - t.Fatalf("Failed to create testdataclient: %v", err) - } + require.NoError(t, err) + defer dc.Close() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) // create LB in front of apiservers to be able to switch the data served by apiserver @@ -149,7 +135,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> defer lb.Close() rsvo := skipper.Options{ - Address: ":8082", + Address: findAddress(t), KubernetesURL: lb.URL, KubernetesRedisServiceNamespace: "skipper", KubernetesRedisServiceName: "redis", @@ -160,21 +146,15 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> go routesrv.Run(rsvo) - for { - rsp, _ := http.DefaultClient.Head("http://localhost:8082/routes") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+rsvo.Address+"/routes", 1*time.Second) // run skipper proxy that we want to test o := skipper.Options{ - Address: ":9090", + Address: findAddress(t), EnableRatelimiters: true, EnableSwarm: true, Kubernetes: true, - SwarmRedisEndpointsRemoteURL: "http://localhost:8082/swarm/redis/shards", + SwarmRedisEndpointsRemoteURL: "http://" + rsvo.Address + "/swarm/redis/shards", KubernetesURL: lb.URL, KubernetesHealthcheck: true, SourcePollTimeout: 1500 * time.Millisecond, @@ -182,26 +162,19 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ClusterRatelimitMaxGroupShards: 2, SwarmRedisDialTimeout: 100 * time.Millisecond, SuppressRouteUpdateLogs: false, - SupportListener: ":9091", + SupportListener: findAddress(t), SwarmRedisUpdateInterval: time.Second, } + runResult := make(chan error) sigs := make(chan os.Signal, 1) - go skipper.RunWithShutdown(o, sigs, nil) - - for i := 0; i < 10; i++ { - t.Logf("Waiting for proxy being ready") + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() - rsp, _ := http.DefaultClient.Get("http://localhost:9090/kube-system/healthz") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+o.Address+"/kube-system/healthz", 1*time.Second) rate := 10 sec := 5 - va := httptest.NewVegetaAttacker("http://localhost:9090/test", rate, time.Second, time.Second) + va := httptest.NewVegetaAttacker("http://"+o.Address+"/test", rate, time.Second, time.Second) va.Attack(io.Discard, time.Duration(sec)*time.Second, "mytest") successRate := va.Success() @@ -231,6 +204,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> assert.InEpsilon(t, sec*rate-(1*sec), countLimited, epsilon, fmt.Sprintf("Test should have limited requests between %d and %d", countLimited-int(epsilon), countLimited+int(epsilon))) sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) } func TestConcurrentKubernetesClusterStateAccess(t *testing.T) { @@ -281,27 +255,15 @@ spec: // apiserver1 redisSpec1 := createRedisEndpointsSpec(t, redis1) - apiServer1, u1, err := createApiserver(kubeSpec + redisSpec1) - if err != nil { - t.Fatalf("Failed to start apiserver1: %v", err) - } - defer apiServer1.Close() + apiServer1 := createApiserver(t, kubeSpec+redisSpec1) // apiserver2 redisSpec2 := createRedisEndpointsSpec(t, redis1, redis2) - apiServer2, u2, err := createApiserver(kubeSpec + redisSpec2) - if err != nil { - t.Fatalf("Failed to start apiserver2: %v", err) - } - defer apiServer2.Close() + apiServer2 := createApiserver(t, kubeSpec+redisSpec2) // apiserver3 redisSpec3 := createRedisEndpointsSpec(t, redis1, redis2, redis3) - apiServer3, u3, err := createApiserver(kubeSpec + redisSpec3) - if err != nil { - t.Fatalf("Failed to start apiserver3: %v", err) - } - defer apiServer3.Close() + apiServer3 := createApiserver(t, kubeSpec+redisSpec3) // create skipper as LB to kube-apiservers fr := createFilterRegistry(fscheduler.NewFifo(), flog.NewEnableAccessLog()) @@ -312,15 +274,13 @@ spec: }) defer reg.Close() - docFmt := ` -r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ; -r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ; -` - docApiserver := fmt.Sprintf(docFmt, u1.String(), u2.String(), u3.String(), u1.String(), u2.String(), u3.String()) + docApiserver := fmt.Sprintf(`r1: * -> enableAccessLog(4,5) -> fifo(100,100,"3s") -> ;`, + apiServer1.URL, apiServer2.URL, apiServer3.URL) + dc, err := testdataclient.NewDoc(docApiserver) - if err != nil { - t.Fatalf("Failed to create testdataclient: %v", err) - } + require.NoError(t, err) + defer dc.Close() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{}) // create LB in front of apiservers to be able to switch the data served by apiserver @@ -349,7 +309,7 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> // run skipper proxy that we want to test o := skipper.Options{ - Address: ":9090", + Address: findAddress(t), EnableRatelimiters: true, EnableSwarm: true, Kubernetes: true, @@ -363,26 +323,19 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> ClusterRatelimitMaxGroupShards: 2, SwarmRedisDialTimeout: 100 * time.Millisecond, SuppressRouteUpdateLogs: false, - SupportListener: ":9091", + SupportListener: findAddress(t), SwarmRedisUpdateInterval: time.Second, } + runResult := make(chan error) sigs := make(chan os.Signal, 1) - go skipper.RunWithShutdown(o, sigs, nil) - - for i := 0; i < 10; i++ { - t.Logf("Waiting for proxy being ready") + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() - rsp, _ := http.DefaultClient.Get("http://localhost:9090/kube-system/healthz") - if rsp != nil && rsp.StatusCode == 200 { - break - } - time.Sleep(100 * time.Millisecond) - } + waitForOK(t, "http://"+o.Address+"/kube-system/healthz", 1*time.Second) rate := 10 sec := 5 - va := httptest.NewVegetaAttacker("http://localhost:9090/test", rate, time.Second, time.Second) + va := httptest.NewVegetaAttacker("http://"+o.Address+"/test", rate, time.Second, time.Second) va.Attack(io.Discard, time.Duration(sec)*time.Second, "mytest") t.Logf("Success [0..1]: %0.2f", va.Success()) @@ -403,16 +356,176 @@ r2: PathRegexp("/endpoints") -> enableAccessLog(2,4,5) -> fifo(100,100,"3s") -> } sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) } -func createApiserver(spec string) (*stdlibhttptest.Server, *url.URL, error) { +func TestRedisAddrUpdater(t *testing.T) { + dm := metrics.Default + t.Cleanup(func() { metrics.Default = dm }) + + const redisUpdateInterval = 10 * time.Millisecond + const kubeSpec = ` +apiVersion: zalando.org/v1 +kind: RouteGroup +metadata: + name: target +spec: + backends: + - name: shunt + type: shunt + defaultBackends: + - backendName: shunt + routes: + - pathSubtree: /test + filters: + - inlineContent("OK") +--- +apiVersion: v1 +kind: Service +metadata: + labels: + application: skipper-ingress-redis + name: redis + namespace: skipper +spec: + clusterIP: None + ports: + - port: 6379 + protocol: TCP + targetPort: 6379 + selector: + application: skipper-ingress-redis + type: ClusterIP +` + + t.Run("without kubernetes dataclient", func(t *testing.T) { + spec := kubeSpec + createRedisEndpointsSpec(t, "10.2.0.1:6379", "10.2.0.2:6379", "10.2.0.3:6379") + apiServer := createApiserver(t, spec) + + metrics := &metricstest.MockMetrics{} + + o := skipper.Options{ + Address: findAddress(t), + EnableRatelimiters: true, + EnableSwarm: true, + Kubernetes: false, // do not enable kubernetes dataclient + KubernetesURL: apiServer.URL, + KubernetesRedisServiceNamespace: "skipper", + KubernetesRedisServiceName: "redis", + KubernetesRedisServicePort: 6379, + SwarmRedisUpdateInterval: redisUpdateInterval, + InlineRoutes: `Path("/ready") -> inlineContent("OK") -> `, + MetricsBackend: metrics, + } + + runResult := make(chan error) + sigs := make(chan os.Signal, 1) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() + + waitForOK(t, "http://"+o.Address+"/ready", 1*time.Second) + time.Sleep(2 * redisUpdateInterval) + + metrics.WithGauges(func(g map[string]float64) { + t.Logf("gauges: %v", g) + + assert.Equal(t, 1.0, g["routes.total"], "expected only the /ready route") + assert.Equal(t, 3.0, g["swarm.redis.shards"]) + }) + + sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) + }) + + t.Run("kubernetes dataclient", func(t *testing.T) { + spec := kubeSpec + createRedisEndpointsSpec(t, "10.2.0.1:6379", "10.2.0.2:6379", "10.2.0.3:6379", "10.2.0.4:6379") + apiServer := createApiserver(t, spec) + + metrics := &metricstest.MockMetrics{} + + o := skipper.Options{ + Address: findAddress(t), + EnableRatelimiters: true, + EnableSwarm: true, + Kubernetes: true, // enable kubernetes dataclient + KubernetesURL: apiServer.URL, + KubernetesRedisServiceNamespace: "skipper", + KubernetesRedisServiceName: "redis", + KubernetesRedisServicePort: 6379, + SwarmRedisUpdateInterval: redisUpdateInterval, + MetricsBackend: metrics, + } + + runResult := make(chan error) + sigs := make(chan os.Signal, 1) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() + + waitForOK(t, "http://"+o.Address+"/test", 1*time.Second) + time.Sleep(2 * redisUpdateInterval) + + metrics.WithGauges(func(g map[string]float64) { + t.Logf("gauges: %v", g) + + assert.Equal(t, 1.0, g["routes.total"], "expected only the /test route") + assert.Equal(t, 4.0, g["swarm.redis.shards"]) + }) + + sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) + }) + + t.Run("remote url", func(t *testing.T) { + eps := stdlibhttptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Write([]byte(`{ + "endpoints": [ + {"address": "10.2.0.1:6379"}, {"address": "10.2.0.2:6379"}, + {"address": "10.2.0.3:6379"}, {"address": "10.2.0.4:6379"}, + {"address": "10.2.0.5:6379"} + ] + }`)) + })) + defer eps.Close() + + metrics := &metricstest.MockMetrics{} + + o := skipper.Options{ + Address: findAddress(t), + EnableRatelimiters: true, + EnableSwarm: true, + SwarmRedisEndpointsRemoteURL: eps.URL, + SwarmRedisUpdateInterval: redisUpdateInterval, + InlineRoutes: `Path("/ready") -> inlineContent("OK") -> `, + MetricsBackend: metrics, + } + + runResult := make(chan error) + sigs := make(chan os.Signal, 1) + go func() { runResult <- skipper.RunWithShutdown(o, sigs, nil) }() + + waitForOK(t, "http://"+o.Address+"/ready", 1*time.Second) + time.Sleep(2 * redisUpdateInterval) + + metrics.WithGauges(func(g map[string]float64) { + t.Logf("gauges: %v", g) + + assert.Equal(t, 1.0, g["routes.total"], "expected only the /ready route") + assert.Equal(t, 5.0, g["swarm.redis.shards"]) + }) + + sigs <- syscall.SIGTERM + assert.NoError(t, <-runResult) + }) +} + +func createApiserver(t *testing.T, spec string) *stdlibhttptest.Server { + t.Helper() + api, err := kubernetestest.NewAPI(kubernetestest.TestAPIOptions{}, bytes.NewBufferString(spec)) - if err != nil { - return nil, nil, err - } + require.NoError(t, err) + apiServer := stdlibhttptest.NewServer(api) - u, err := url.Parse(apiServer.URL) - return apiServer, u, err + t.Cleanup(apiServer.Close) + + return apiServer } func createFilterRegistry(specs ...filters.Spec) filters.Registry { @@ -457,3 +570,37 @@ func createRedisEndpointsSpec(t *testing.T, addrs ...string) string { return fmt.Sprintf("---\n%s\n", b) } + +func findAddress(t *testing.T) string { + t.Helper() + + l, err := net.ListenTCP("tcp6", &net.TCPAddr{}) + require.NoError(t, err) + + addr := l.Addr().String() + require.NoError(t, l.Close()) + + return addr +} + +func waitForOK(t *testing.T, url string, timeout time.Duration) { + t.Helper() + + to := time.After(timeout) + for { + rsp, err := http.DefaultClient.Get(url) + if err == nil { + rsp.Body.Close() + if rsp.StatusCode == http.StatusOK { + return + } + } + + select { + case <-to: + t.Fatalf("timeout waiting for %s", url) + default: + time.Sleep(100 * time.Millisecond) + } + } +} diff --git a/skipper.go b/skipper.go index e118182a90..3457ffcbf7 100644 --- a/skipper.go +++ b/skipper.go @@ -1359,22 +1359,35 @@ func findKubernetesDataclient(dataClients []routing.DataClient) *kubernetes.Clie return kdc } -func getRedisUpdaterFunc(opts *Options, kdc *kubernetes.Client) func() ([]string, error) { - // TODO(sszuecs): make sure kubernetes dataclient is already initialized and - // has polled the data once or kdc.GetEndpointAdresses should be blocking - // call to kubernetes API - return func() ([]string, error) { - a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) - log.Debugf("Redis updater called and found %d redis endpoints", len(a)) +func getKubernetesRedisAddrUpdater(opts *Options, kdc *kubernetes.Client, loaded bool) func() ([]string, error) { + if loaded { + // TODO(sszuecs): make sure kubernetes dataclient is already initialized and + // has polled the data once or kdc.GetEndpointAdresses should be blocking + // call to kubernetes API + return func() ([]string, error) { + a := kdc.GetEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + log.Debugf("GetEndpointAddresses found %d redis endpoints", len(a)) + + return joinPort(a, opts.KubernetesRedisServicePort), nil + } + } else { + return func() ([]string, error) { + a, err := kdc.LoadEndpointAddresses(opts.KubernetesRedisServiceNamespace, opts.KubernetesRedisServiceName) + log.Debugf("LoadEndpointAddresses found %d redis endpoints, err: %v", len(a), err) - port := strconv.Itoa(opts.KubernetesRedisServicePort) - for i := 0; i < len(a); i++ { - a[i] = net.JoinHostPort(a[i], port) + return joinPort(a, opts.KubernetesRedisServicePort), err } - return a, nil } } +func joinPort(addrs []string, port int) []string { + p := strconv.Itoa(port) + for i := 0; i < len(addrs); i++ { + addrs[i] = net.JoinHostPort(addrs[i], p) + } + return addrs +} + type RedisEndpoint struct { Address string `json:"address"` } @@ -1383,7 +1396,7 @@ type RedisEndpoints struct { Endpoints []RedisEndpoint `json:"endpoints"` } -func updateEndpointsFromURL(address string) func() ([]string, error) { +func getRemoteURLRedisAddrUpdater(address string) func() ([]string, error) { /* #nosec */ return func() ([]string, error) { resp, err := http.Get(address) @@ -1690,25 +1703,32 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { kdc := findKubernetesDataclient(dataClients) if kdc != nil { - redisOptions.AddrUpdater = getRedisUpdaterFunc(&o, kdc) - _, err = redisOptions.AddrUpdater() + redisOptions.AddrUpdater = getKubernetesRedisAddrUpdater(&o, kdc, true) + } else { + kdc, err := kubernetes.New(o.KubernetesDataClientOptions()) if err != nil { - log.Errorf("Failed to update redis address %v", err) return err } - } else { - log.Errorf("Failed to find kubernetes dataclient, but redis shards should be get by kubernetes svc %s/%s", o.KubernetesRedisServiceNamespace, o.KubernetesRedisServiceName) + defer kdc.Close() + + redisOptions.AddrUpdater = getKubernetesRedisAddrUpdater(&o, kdc, false) + } + + _, err = redisOptions.AddrUpdater() + if err != nil { + log.Errorf("Failed to update redis addresses from kubernetes: %v", err) + return err } } else if redisOptions != nil && o.SwarmRedisEndpointsRemoteURL != "" { log.Infof("Use remote address %s to fetch updates redis shards", o.SwarmRedisEndpointsRemoteURL) - redisOptions.AddrUpdater = updateEndpointsFromURL(o.SwarmRedisEndpointsRemoteURL) + redisOptions.AddrUpdater = getRemoteURLRedisAddrUpdater(o.SwarmRedisEndpointsRemoteURL) + _, err = redisOptions.AddrUpdater() if err != nil { - log.Errorf("Failed to update redis endpoints from URL %v", err) + log.Errorf("Failed to update redis addresses from URL: %v", err) return err } } - } var ratelimitRegistry *ratelimit.Registry