Skip to content

Commit

Permalink
skipper: discover redis shards without fetching full cluster state (#…
Browse files Browse the repository at this point in the history
…2934)

Skipper supports Redis based Cluster Ratelimits and
dynamic discovery of Redis instances using Kubernetes Service,
see https://github.com/zalando/skipper/blob/master/docs/tutorials/ratelimit.md#redis-based-cluster-ratelimits

Dynamic discovery relies on Kubernetes Dataclient that also fetches
Ingresses and RouteGroups and creates routes from them.

This change enables dynamic discovery of Redis instances using
Kubernetes without fetching Ingresses and RouteGroups.

When `-kubernetes` flag is not set but `-enable-swarm`,
`-kubernetes-redis-service-namespace`, `-kubernetes-redis-service-name` and
`-kubernetes-redis-service-port` are provided then Skipper creates
Kubernetes dataclient to discover Redis instances
but does not use this dataclient for routing.

The change adds LoadEndpointAddresses to Kubernetes dataclient that is
similar to caching GetEndpointAddresses but does not rely on
previous call to Load or LoadAll to allow discovery of
Redis shards without fetching full cluster state.

Fixes #2476

Signed-off-by: Alexander Yastrebov <[email protected]>
  • Loading branch information
AlexanderYastrebov authored Feb 16, 2024
1 parent 7edee31 commit 37c474a
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 131 deletions.
26 changes: 26 additions & 0 deletions dataclients/kubernetes/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,29 @@ func TestClientGetEndpointAddresses(t *testing.T) {
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
}

func TestClientLoadEndpointAddresses(t *testing.T) {
t.Run("from endpoints", func(t *testing.T) {
client := newTestClient(t,
kubernetes.Options{},
"testdata/ingressV1/ingress-data/lb-target-multi.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.2", "42.0.1.3"}, addrs)
})

t.Run("from endpointslices", func(t *testing.T) {
client := newTestClient(t,
kubernetes.Options{
KubernetesEnableEndpointslices: true,
},
"testdata/ingressV1/ingress-data/lb-target-multi-multiple-endpointslices-conditions-all-ready.yaml",
)

addrs, err := client.LoadEndpointAddresses("namespace1", "service1")
assert.NoError(t, err)
assert.Equal(t, []string{"42.0.1.1", "42.0.1.2", "42.0.1.3", "42.0.1.4"}, addrs)
})
}
44 changes: 41 additions & 3 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,13 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e

// loadEndpointSlices is different from the other load$Kind()
// functions because there are 1..N endpointslices created for a given
// service. endpointslices need to be deduplicated and state needs to
// service. Endpointslices need to be deduplicated and state needs to
// be checked. We read all endpointslices and create de-duplicated
// business objects skipperEndpointSlice instead of raw Kubernetes
// business objects [skipperEndpointSlice] instead of raw Kubernetes
// objects, because we need just a clean list of load balancer
// members. The returned map will return the full list of ready
// non-terminating endpoints that should be in the load balancer of a
// given service, check endpointSlice.ToResourceID().
// given service, check [endpointSlice.ToResourceID].
func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skipperEndpointSlice, error) {
var endpointSlices endpointSliceList
if err := c.getJSON(c.endpointSlicesURI+c.endpointSlicesLabelSelectors, &endpointSlices); err != nil {
Expand All @@ -497,6 +497,10 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
}
log.Debugf("all endpointslices received: %d", len(endpointSlices.Items))

return collectReadyEndpoints(&endpointSlices), nil
}

func collectReadyEndpoints(endpointSlices *endpointSliceList) map[definitions.ResourceID]*skipperEndpointSlice {
mapSlices := make(map[definitions.ResourceID][]*endpointSlice)
for _, endpointSlice := range endpointSlices.Items {
resID := endpointSlice.ToResourceID() // service resource ID
Expand Down Expand Up @@ -550,6 +554,40 @@ func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skippe
result[resID].Endpoints = append(result[resID].Endpoints, o)
}
}
return result
}

// loadEndpointAddresses returns the list of all addresses for the given service using endpoints or endpointslices API.
func (c *clusterClient) loadEndpointAddresses(namespace, name string) ([]string, error) {
var result []string
if c.enableEndpointSlices {
url := fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace) +
toLabelSelectorQuery(map[string]string{endpointSliceServiceNameLabel: name})

var endpointSlices endpointSliceList
if err := c.getJSON(url, &endpointSlices); err != nil {
return nil, fmt.Errorf("requesting endpointslices for %s/%s failed: %w", namespace, name, err)
}

ready := collectReadyEndpoints(&endpointSlices)
if len(ready) != 1 {
return nil, fmt.Errorf("unexpected number of endpoint slices for %s/%s: %d", namespace, name, len(ready))
}

for _, eps := range ready {
result = eps.addresses()
break
}
} else {
url := fmt.Sprintf(EndpointsNamespaceFmt, namespace) + "/" + name

var ep endpoint
if err := c.getJSON(url, &ep); err != nil {
return nil, fmt.Errorf("requesting endpoints for %s/%s failed: %w", namespace, name, err)
}
result = ep.addresses()
}
sort.Strings(result)

return result, nil
}
Expand Down
4 changes: 3 additions & 1 deletion dataclients/kubernetes/endpointslices.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/zalando/skipper/dataclients/kubernetes/definitions"
)

const endpointSliceServiceNameLabel = "kubernetes.io/service-name"

// There are [1..N] Kubernetes endpointslices created for a single Kubernetes service.
// Kubernetes endpointslices of a given service can have duplicates with different states.
// Therefore Kubernetes endpointslices need to be de-duplicated before usage.
Expand Down Expand Up @@ -97,7 +99,7 @@ type endpointSlice struct {

// ToResourceID returns the same string for a group endpointlisces created for the same svc
func (eps *endpointSlice) ToResourceID() definitions.ResourceID {
svcName := eps.Meta.Labels["kubernetes.io/service-name"]
svcName := eps.Meta.Labels[endpointSliceServiceNameLabel]
namespace := eps.Meta.Namespace
return newResourceID(namespace, svcName)
}
Expand Down
8 changes: 7 additions & 1 deletion dataclients/kubernetes/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ func (c *Client) fetchDefaultFilterConfigs() defaultFilters {
return filters
}

// GetEndpointAddresses returns the list of all addresses for the given service.
// GetEndpointAddresses returns the list of all addresses for the given service
// loaded by previous call to LoadAll or LoadUpdate.
func (c *Client) GetEndpointAddresses(ns, name string) []string {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -584,6 +585,11 @@ func (c *Client) GetEndpointAddresses(ns, name string) []string {
return c.state.getEndpointAddresses(ns, name)
}

// LoadEndpointAddresses returns the list of all addresses for the given service.
func (c *Client) LoadEndpointAddresses(namespace, name string) ([]string, error) {
return c.ClusterClient.loadEndpointAddresses(namespace, name)
}

func compareStringList(a, b []string) []string {
c := make([]string, 0)
for i := len(a) - 1; i >= 0; i-- {
Expand Down
51 changes: 33 additions & 18 deletions docs/tutorials/ratelimit.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,36 +94,51 @@ and your dataclient in use.

### Redis based Cluster Ratelimits

This solution is independent of the dataclient being used. You have
to run N number of [Redis](https://redis.io) instances, where N is >
0.
This solution is independent of the dataclient being used.
You have to run one or more [Redis](https://redis.io) instances.
See also [Running with Redis based Cluster Ratelimits](../kubernetes/ingress-controller.md#redis-based).

There are 2 different configurations to assign redis instances as a skipper redis swarm.
There are 3 different configurations to assign Redis instances as a Skipper Redis swarm.

- Static:
- Specify `-swarm-redis-urls`, multiple instances can be separated
by `,`, for example: `-swarm-redis-urls=redis1:6379,redis2:6379`. Use this if you don't need to scale your redis instances.
#### Static

- Kubernetes Service Selector:
- Specify `-kubernetes-redis-service-namespace=default` and `-kubernetes-redis-service-name=redis`, this will create kubernetes dataclient which will take care of collecting redis instances endpoints. This will allow you to scale redis instances as much as you want.
Specify `-swarm-redis-urls`, multiple instances can be separated by comma,
for example: `-swarm-redis-urls=redis1:6379,redis2:6379`.
Use this if you don't need to scale your Redis instances.

- HTTP Endpoint
- Specify `-swarm-redis-remote=http://127.0.0.1/redis/endpoints`, this will make skipper pull endpoints from this remote URL and work with them.
#### Kubernetes Service Selector

see also [Running with
Redis based Cluster Ratelimits](../kubernetes/ingress-controller.md#redis-based)
Specify `-kubernetes-redis-service-namespace=<namespace>`, `-kubernetes-redis-service-name=<name>`
and optional `-kubernetes-redis-service-port=<port number>`.

#### Redis Swarm Configuration
Skipper will update Redis addresses every 10 seconds from specified service endpoints.
This allows you to dynamically scale Redis instances.
Note that when `-kubernetes` is set Skipper also fetches `Ingresses` and `RouteGroups` for routing,
see [ingress-controller deployment docs](../kubernetes/ingress-controller.md).

When working with Redis swarm, use Kubernetes service selector. Configure it with `-kubernetes-redis-service-namespace` and `-kubernetes-redis-service-name` flags.
#### HTTP Endpoint

Auto-discovery routine for new Redis endpoints will be triggered every 10 seconds.
Specify `-swarm-redis-remote=http://127.0.0.1/redis/endpoints`,

If you have [routesrv proxy](https://opensource.zalando.com/skipper/kubernetes/ingress-controller/#routesrv) enabled, you need to configure Skipper with the flag `-swarm-redis-remote=http://skipper.ingress.cluster.local/swarm/redis/shards` where value is the service address for `routesrv` service. `Routesrv` will be responsible for collecting Redis endpoints and Skipper will poll them from the proxy.
Skipper will update Redis addresses every 10 seconds from this remote URL
that should return data in the following JSON format:
```json
{
"endpoints": [
{"address": "10.2.0.1:6379"}, {"address": "10.2.0.2:6379"},
{"address": "10.2.0.3:6379"}, {"address": "10.2.0.4:6379"},
{"address": "10.2.0.5:6379"}
]
}
```

If you have [routesrv proxy](https://opensource.zalando.com/skipper/kubernetes/ingress-controller/#routesrv) enabled,
you need to configure Skipper with the flag `-swarm-redis-remote=http://<routesrv-service-name>.<routesrv-namespace>.svc.cluster.local/swarm/redis/shards`.
`Routesrv` will be responsible for collecting Redis endpoints and Skipper will poll them from it.

#### Implementation

The implementation use [redis ring](https://godoc.org/github.com/go-redis/redis#Ring)
The implementation use [Redis ring](https://godoc.org/github.com/go-redis/redis#Ring)
to be able to shard via client hashing and spread the load across
multiple Redis instances to be able to scale out the shared storage.

Expand Down
Loading

0 comments on commit 37c474a

Please sign in to comment.