Skip to content

Commit

Permalink
feature: fetch and use endpointslices if available (#2565)
Browse files Browse the repository at this point in the history
increase minor version because of the importance of the change and the required RBAC clusterrole change

refactor: split out endpointslices and endpoints from ingress definitions
refactor: split out non service resources and rename file
refactor: value receiver -> ptr receiver
doc: -enable-kubernetes-endpointslices=true to enable EndpointSlices to scale out more than 1000 endpoints
doc: change RBAC clusterroles to reflect the endpointslices change
test: add coverage for Targets()
test: dataclients/kubernetes add failing testcase for named service target port
fix: empty port name is fine, because if so there is only one allowed by kubernetes itself, otherwise port has a mandatory name

Signed-off-by: Alexander Yastrebov <[email protected]>
Signed-off-by: Sandor Szücs <[email protected]>
  • Loading branch information
szuecs authored Sep 18, 2023
1 parent 8a3aba7 commit ce18cd9
Show file tree
Hide file tree
Showing 94 changed files with 2,727 additions and 316 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.17
v0.18
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type Config struct {
KubernetesPathModeString string `yaml:"kubernetes-path-mode"`
KubernetesPathMode kubernetes.PathMode `yaml:"-"`
KubernetesNamespace string `yaml:"kubernetes-namespace"`
KubernetesEnableEndpointSlices bool `yaml:"enable-kubernetes-endpointslices"`
KubernetesEnableEastWest bool `yaml:"enable-kubernetes-east-west"`
KubernetesEastWestDomain string `yaml:"kubernetes-east-west-domain"`
KubernetesEastWestRangeDomains *listFlag `yaml:"kubernetes-east-west-range-domains"`
Expand Down Expand Up @@ -448,6 +449,7 @@ func NewConfig() *Config {
flag.StringVar(&cfg.WhitelistedHealthCheckCIDR, "whitelisted-healthcheck-cidr", "", "sets the iprange/CIDRS to be whitelisted during healthcheck")
flag.StringVar(&cfg.KubernetesPathModeString, "kubernetes-path-mode", "kubernetes-ingress", "controls the default interpretation of Kubernetes ingress paths: <kubernetes-ingress|path-regexp|path-prefix>")
flag.StringVar(&cfg.KubernetesNamespace, "kubernetes-namespace", "", "watch only this namespace for ingresses")
flag.BoolVar(&cfg.KubernetesEnableEndpointSlices, "enable-kubernetes-endpointslices", false, "Enables that skipper fetches Kubernetes endpointslices instead of endpoints to scale more than 1000 pods within a service")
flag.BoolVar(&cfg.KubernetesEnableEastWest, "enable-kubernetes-east-west", false, "*Deprecated*: use kubernetes-east-west-range feature. Enables east-west communication, which automatically adds routes for Ingress objects with hostname <name>.<namespace>.skipper.cluster.local")
flag.StringVar(&cfg.KubernetesEastWestDomain, "kubernetes-east-west-domain", "", "*Deprecated*: use kubernetes-east-west-range feature. Sets the east-west domain, defaults to .skipper.cluster.local")
flag.Var(cfg.KubernetesEastWestRangeDomains, "kubernetes-east-west-range-domains", "set the the cluster internal domains for east west traffic. Identified routes to such domains will include the -kubernetes-east-west-range-predicates")
Expand Down Expand Up @@ -790,6 +792,7 @@ func (c *Config) ToOptions() skipper.Options {
WhitelistedHealthCheckCIDR: whitelistCIDRS,
KubernetesPathMode: c.KubernetesPathMode,
KubernetesNamespace: c.KubernetesNamespace,
KubernetesEnableEndpointslices: c.KubernetesEnableEndpointSlices,
KubernetesEnableEastWest: c.KubernetesEnableEastWest,
KubernetesEastWestDomain: c.KubernetesEastWestDomain,
KubernetesEastWestRangeDomains: c.KubernetesEastWestRangeDomains.values,
Expand Down
166 changes: 129 additions & 37 deletions dataclients/kubernetes/clusterclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,18 @@ const (
IngressesV1ClusterURI = "/apis/networking.k8s.io/v1/ingresses"
ZalandoResourcesClusterURI = "/apis/zalando.org/v1"
RouteGroupsName = "routegroups"
routeGroupsClusterURI = "/apis/zalando.org/v1/routegroups"
RouteGroupsClusterURI = "/apis/zalando.org/v1/routegroups"
routeGroupClassKey = "zalando.org/routegroup.class"
ServicesClusterURI = "/api/v1/services"
EndpointsClusterURI = "/api/v1/endpoints"
EndpointSlicesClusterURI = "/apis/discovery.k8s.io/v1/endpointslices"
SecretsClusterURI = "/api/v1/secrets"
defaultKubernetesURL = "http://localhost:8001"
IngressesV1NamespaceFmt = "/apis/networking.k8s.io/v1/namespaces/%s/ingresses"
routeGroupsNamespaceFmt = "/apis/zalando.org/v1/namespaces/%s/routegroups"
RouteGroupsNamespaceFmt = "/apis/zalando.org/v1/namespaces/%s/routegroups"
ServicesNamespaceFmt = "/api/v1/namespaces/%s/services"
EndpointsNamespaceFmt = "/api/v1/namespaces/%s/endpoints"
EndpointSlicesNamespaceFmt = "/apis/discovery.k8s.io/v1/namespaces/%s/endpointslices"
SecretsNamespaceFmt = "/api/v1/namespaces/%s/secrets"
serviceAccountDir = "/var/run/secrets/kubernetes.io/serviceaccount/"
serviceAccountTokenKey = "token"
Expand All @@ -55,6 +57,7 @@ type clusterClient struct {
routeGroupsURI string
servicesURI string
endpointsURI string
endpointSlicesURI string
secretsURI string
tokenProvider secrets.SecretsProvider
tokenFile string
Expand All @@ -65,11 +68,14 @@ type clusterClient struct {
ingressClass *regexp.Regexp
httpClient *http.Client

ingressLabelSelectors string
servicesLabelSelectors string
endpointsLabelSelectors string
secretsLabelSelectors string
routeGroupsLabelSelectors string
ingressLabelSelectors string
servicesLabelSelectors string
endpointsLabelSelectors string
endpointSlicesLabelSelectors string
secretsLabelSelectors string
routeGroupsLabelSelectors string

enableEndpointSlices bool

loggedMissingRouteGroups bool
routeGroupValidator *definitions.RouteGroupValidator
Expand Down Expand Up @@ -149,22 +155,25 @@ func newClusterClient(o Options, apiURL, ingCls, rgCls string, quit <-chan struc
}

c := &clusterClient{
ingressesURI: IngressesV1ClusterURI,
routeGroupsURI: routeGroupsClusterURI,
servicesURI: ServicesClusterURI,
endpointsURI: EndpointsClusterURI,
secretsURI: SecretsClusterURI,
ingressClass: ingClsRx,
ingressLabelSelectors: toLabelSelectorQuery(o.IngressLabelSelectors),
servicesLabelSelectors: toLabelSelectorQuery(o.ServicesLabelSelectors),
endpointsLabelSelectors: toLabelSelectorQuery(o.EndpointsLabelSelectors),
secretsLabelSelectors: toLabelSelectorQuery(o.SecretsLabelSelectors),
routeGroupsLabelSelectors: toLabelSelectorQuery(o.RouteGroupsLabelSelectors),
routeGroupClass: rgClsRx,
httpClient: httpClient,
apiURL: apiURL,
certificateRegistry: o.CertificateRegistry,
routeGroupValidator: &definitions.RouteGroupValidator{},
ingressesURI: IngressesV1ClusterURI,
routeGroupsURI: RouteGroupsClusterURI,
servicesURI: ServicesClusterURI,
endpointsURI: EndpointsClusterURI,
endpointSlicesURI: EndpointSlicesClusterURI,
secretsURI: SecretsClusterURI,
ingressClass: ingClsRx,
ingressLabelSelectors: toLabelSelectorQuery(o.IngressLabelSelectors),
servicesLabelSelectors: toLabelSelectorQuery(o.ServicesLabelSelectors),
endpointsLabelSelectors: toLabelSelectorQuery(o.EndpointsLabelSelectors),
endpointSlicesLabelSelectors: toLabelSelectorQuery(o.EndpointSlicesLabelSelectors),
secretsLabelSelectors: toLabelSelectorQuery(o.SecretsLabelSelectors),
routeGroupsLabelSelectors: toLabelSelectorQuery(o.RouteGroupsLabelSelectors),
routeGroupClass: rgClsRx,
httpClient: httpClient,
apiURL: apiURL,
certificateRegistry: o.CertificateRegistry,
routeGroupValidator: &definitions.RouteGroupValidator{},
enableEndpointSlices: o.KubernetesEnableEndpointslices,
}

if o.KubernetesInCluster {
Expand Down Expand Up @@ -220,9 +229,10 @@ func toLabelSelectorQuery(selectors map[string]string) string {

func (c *clusterClient) setNamespace(namespace string) {
c.ingressesURI = fmt.Sprintf(IngressesV1NamespaceFmt, namespace)
c.routeGroupsURI = fmt.Sprintf(routeGroupsNamespaceFmt, namespace)
c.routeGroupsURI = fmt.Sprintf(RouteGroupsNamespaceFmt, namespace)
c.servicesURI = fmt.Sprintf(ServicesNamespaceFmt, namespace)
c.endpointsURI = fmt.Sprintf(EndpointsNamespaceFmt, namespace)
c.endpointSlicesURI = fmt.Sprintf(EndpointSlicesNamespaceFmt, namespace)
c.secretsURI = fmt.Sprintf(SecretsNamespaceFmt, namespace)
}

Expand Down Expand Up @@ -455,6 +465,81 @@ func (c *clusterClient) loadEndpoints() (map[definitions.ResourceID]*endpoint, e
return result, nil
}

// loadEndpointSlices is different from the other load$Kind()
// functions because there are 1..N endpointslices created for a given
// service. endpointslices need to be deduplicated and state needs to
// be checked. We read all endpointslices and create de-duplicated
// business objects skipperEndpointSlice instead of raw Kubernetes
// objects, because we need just a clean list of load balancer
// members. The returned map will return the full list of ready
// non-terminating endpoints that should be in the load balancer of a
// given service, check endpointSlice.ToResourceID().
func (c *clusterClient) loadEndpointSlices() (map[definitions.ResourceID]*skipperEndpointSlice, error) {
var endpointSlices endpointSliceList

if err := c.getJSON(c.endpointSlicesURI+c.endpointSlicesLabelSelectors, &endpointSlices); err != nil {
log.Debugf("requesting all endpointslices failed: %v", err)
return nil, err
}

log.Debugf("all endpointslices received: %d", len(endpointSlices.Items))
mapSlices := make(map[definitions.ResourceID][]*endpointSlice)
for _, endpointSlice := range endpointSlices.Items {
resID := endpointSlice.ToResourceID() // service resource ID
mapSlices[resID] = append(mapSlices[resID], endpointSlice)
}

result := make(map[definitions.ResourceID]*skipperEndpointSlice)
for resID, epSlices := range mapSlices {
if len(epSlices) == 0 {
continue
}

result[resID] = &skipperEndpointSlice{
Meta: epSlices[0].Meta,
}

terminatingEps := make(map[string]struct{})
resEps := make(map[string]*skipperEndpoint)

for i := range epSlices {

for _, ep := range epSlices[i].Endpoints {
// Addresses [1..100] of the same AddressType, as kube-proxy we use the first
// see also https://github.com/kubernetes/kubernetes/issues/106267
address := ep.Addresses[0]
if _, ok := terminatingEps[address]; ok {
// already known terminating
} else if ep.isTerminating() {
terminatingEps[address] = struct{}{}
// if we had this one with a non terminating condition,
// we should delete it, because of eventual consistency
// it is actually terminating
delete(resEps, address)
} else if ep.Conditions == nil {
// if conditions are nil then we need to treat is as ready
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
}
} else if ep.isReady() {
resEps[address] = &skipperEndpoint{
Address: address,
Zone: ep.Zone,
}
}
}

result[resID].Ports = epSlices[i].Ports
}
for _, o := range resEps {
result[resID].Endpoints = append(result[resID].Endpoints, o)
}
}

return result, nil
}

func (c *clusterClient) logMissingRouteGroupsOnce() {
if c.loggedMissingRouteGroups {
return
Expand All @@ -468,7 +553,6 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
var (
err error
ingressesV1 []*definitions.IngressV1Item
secrets map[definitions.ResourceID]*secret
)
ingressesV1, err = c.loadIngressesV1()
if err != nil {
Expand All @@ -492,24 +576,32 @@ func (c *clusterClient) fetchClusterState() (*clusterState, error) {
return nil, err
}

endpoints, err := c.loadEndpoints()
if err != nil {
return nil, err
state := &clusterState{
ingressesV1: ingressesV1,
routeGroups: routeGroups,
services: services,
cachedEndpoints: make(map[endpointID][]string),
enableEndpointSlices: c.enableEndpointSlices,
}

if c.enableEndpointSlices {
state.endpointSlices, err = c.loadEndpointSlices()
if err != nil {
return nil, err
}
} else {
state.endpoints, err = c.loadEndpoints()
if err != nil {
return nil, err
}
}

if c.certificateRegistry != nil {
secrets, err = c.loadSecrets()
state.secrets, err = c.loadSecrets()
if err != nil {
return nil, err
}
}

return &clusterState{
ingressesV1: ingressesV1,
routeGroups: routeGroups,
services: services,
endpoints: endpoints,
secrets: secrets,
cachedEndpoints: make(map[endpointID][]string),
}, nil
return state, nil
}
75 changes: 54 additions & 21 deletions dataclients/kubernetes/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
)

type clusterState struct {
mu sync.Mutex
ingressesV1 []*definitions.IngressV1Item
routeGroups []*definitions.RouteGroupItem
services map[definitions.ResourceID]*service
endpoints map[definitions.ResourceID]*endpoint
secrets map[definitions.ResourceID]*secret
cachedEndpoints map[endpointID][]string
mu sync.Mutex
ingressesV1 []*definitions.IngressV1Item
routeGroups []*definitions.RouteGroupItem
services map[definitions.ResourceID]*service
endpoints map[definitions.ResourceID]*endpoint
endpointSlices map[definitions.ResourceID]*skipperEndpointSlice
secrets map[definitions.ResourceID]*secret
cachedEndpoints map[endpointID][]string
enableEndpointSlices bool
}

func (state *clusterState) getService(namespace, name string) (*service, error) {
Expand Down Expand Up @@ -47,6 +49,7 @@ func (state *clusterState) getServiceRG(namespace, name string) (*service, error
return s, nil
}

// GetEndpointsByService returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByService(namespace, name, protocol string, servicePort *servicePort) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Expand All @@ -60,18 +63,29 @@ func (state *clusterState) GetEndpointsByService(namespace, name, protocol strin
return cached
}

ep, ok := state.endpoints[epID.ResourceID]
if !ok {
return nil
var targets []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targetsByServicePort("TCP", protocol, servicePort)
} else {
return nil
}
} else {
if ep, ok := state.endpoints[epID.ResourceID]; ok {
targets = ep.targetsByServicePort(protocol, servicePort)
} else {
return nil
}
}

targets := ep.targetsByServicePort(protocol, servicePort)
sort.Strings(targets)
state.cachedEndpoints[epID] = targets
return targets
}

func (state *clusterState) GetEndpointsByName(namespace, name, protocol string) []string {
// GetEndpointsByName returns the skipper endpoints for kubernetes endpoints or endpointslices.
// This function works only correctly for endpointslices (and likely endpoints) with one port with the same protocol ("TCP", "UDP").
func (state *clusterState) GetEndpointsByName(namespace, name, protocol, scheme string) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
Expand All @@ -82,19 +96,29 @@ func (state *clusterState) GetEndpointsByName(namespace, name, protocol string)
return cached
}

ep, ok := state.endpoints[epID.ResourceID]
if !ok {
return nil
var targets []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targets(protocol, scheme)
} else {
return nil
}
} else {
if ep, ok := state.endpoints[epID.ResourceID]; ok {
targets = ep.targets(scheme)
} else {
return nil
}
}

targets := ep.targets(protocol)
sort.Strings(targets)
state.cachedEndpoints[epID] = targets
return targets

}

func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol string, target *definitions.BackendPort) []string {
// GetEndpointsByTarget returns the skipper endpoints for kubernetes endpoints or endpointslices.
func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol, scheme string, target *definitions.BackendPort) []string {
epID := endpointID{
ResourceID: newResourceID(namespace, name),
Protocol: protocol,
Expand All @@ -107,12 +131,21 @@ func (state *clusterState) GetEndpointsByTarget(namespace, name, protocol string
return cached
}

ep, ok := state.endpoints[epID.ResourceID]
if !ok {
return nil
var targets []string
if state.enableEndpointSlices {
if eps, ok := state.endpointSlices[epID.ResourceID]; ok {
targets = eps.targetsByServiceTarget(protocol, scheme, target)
} else {
return nil
}
} else {
if ep, ok := state.endpoints[epID.ResourceID]; ok {
targets = ep.targetsByServiceTarget(scheme, target)
} else {
return nil
}
}

targets := ep.targetsByServiceTarget(protocol, target)
sort.Strings(targets)
state.cachedEndpoints[epID] = targets
return targets
Expand Down
Loading

0 comments on commit ce18cd9

Please sign in to comment.